From 48202068aca6b55b249db46f0f68d4a560878d01 Mon Sep 17 00:00:00 2001 From: solfe Date: Mon, 15 Jun 2026 22:47:42 +0900 Subject: [PATCH 1/8] refactor(word): simplify single-flight key dimensions --- .../api/word/service/WordSingleFlightProperties.java | 6 +----- .../word/service/WordSingleFlightRedisCoordinator.java | 6 ++---- src/main/resources/application.properties | 4 +--- ...ordSingleFlightRedisCoordinatorIntegrationTest.java | 10 ++++------ .../service/WordSingleFlightRedisCoordinatorTest.java | 4 +--- 5 files changed, 9 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/linglevel/api/word/service/WordSingleFlightProperties.java b/src/main/java/com/linglevel/api/word/service/WordSingleFlightProperties.java index bc0fe601..867a903c 100644 --- a/src/main/java/com/linglevel/api/word/service/WordSingleFlightProperties.java +++ b/src/main/java/com/linglevel/api/word/service/WordSingleFlightProperties.java @@ -17,9 +17,5 @@ public class WordSingleFlightProperties { private long resultTtlMs = 60_000; - private String promptVersion = "v1"; - - private String model = "default"; - - private String schemaVersion = "v2"; + private String resultSchemaVersion = "v2"; } diff --git a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java index 1b872881..927a4e2f 100644 --- a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java +++ b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java @@ -266,13 +266,11 @@ private KeySet buildKeySet(String word, LanguageCode targetLanguage) { String canonicalKey = String.join("|", "word=" + normalizedWord, "lang=" + targetLanguage.getCode(), - "prompt=" + properties.getPromptVersion(), - "model=" + properties.getModel(), - "schema=" + properties.getSchemaVersion() + "resultSchema=" + properties.getResultSchemaVersion() ); String digest = sha256(canonicalKey); - String suffix = properties.getSchemaVersion() + ":" + digest; + String suffix = properties.getResultSchemaVersion() + ":" + digest; return new KeySet( LOCK_PREFIX + ":" + suffix, diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 8c029c5d..e86fd1f7 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -55,9 +55,7 @@ spring.data.redis.ssl.enabled=false word.single-flight.enabled=true word.single-flight.wait-timeout-ms=11000 word.single-flight.result-ttl-ms=25000 -word.single-flight.prompt-version=v1 -word.single-flight.model=${spring.ai.bedrock.converse.chat.options.model:default} -word.single-flight.schema-version=v2 +word.single-flight.result-schema-version=v2 # AWS S3 (AI Input/Output buckets) aws.s3.region=${S3_REGION} diff --git a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java index c408abd6..79621b0a 100644 --- a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java @@ -37,8 +37,8 @@ class WordSingleFlightRedisCoordinatorIntegrationTest extends AbstractRedisTest @BeforeEach void setUp() { - nodeA = createNode("test-model-a", 3_000); - nodeB = createNode("test-model-a", 3_000); + nodeA = createNode(3_000); + nodeB = createNode(3_000); flushAll(nodeA.template); } @@ -117,7 +117,7 @@ void propagatesLeaderFailureAcrossTwoCoordinatorsUsingRealRedis() { assertThat(aiCalls.get()).isEqualTo(1); } - private CoordinatorFixture createNode(String model, long waitTimeoutMs) { + private CoordinatorFixture createNode(long waitTimeoutMs) { GenericContainer redis = getRedisContainer(); RedisStandaloneConfiguration config = new RedisStandaloneConfiguration(redis.getHost(), redis.getMappedPort(6379)); @@ -142,9 +142,7 @@ private CoordinatorFixture createNode(String model, long waitTimeoutMs) { properties.setEnabled(true); properties.setWaitTimeoutMs(waitTimeoutMs); properties.setResultTtlMs(30_000); - properties.setPromptVersion("v1"); - properties.setModel(model); - properties.setSchemaVersion("v2"); + properties.setResultSchemaVersion("v2"); WordSingleFlightRedisCoordinator coordinator = new WordSingleFlightRedisCoordinator( template, diff --git a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java index c31ce9b0..7f7e18d1 100644 --- a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java @@ -72,9 +72,7 @@ void setUp() { properties.setEnabled(true); properties.setWaitTimeoutMs(120); properties.setResultTtlMs(2_000); - properties.setPromptVersion("v1"); - properties.setModel("test-model"); - properties.setSchemaVersion("v2"); + properties.setResultSchemaVersion("v2"); when(stringRedisTemplate.opsForValue()).thenReturn(valueOperations); when(redissonClient.getLock(anyString())).thenReturn(redissonLock); From 7c1950e07422a5e1fba4009e4fb5b0b40306e897 Mon Sep 17 00:00:00 2001 From: solfe Date: Mon, 15 Jun 2026 23:32:50 +0900 Subject: [PATCH 2/8] refactor(word): use DB-backed single-flight completion --- .../api/word/service/WordService.java | 93 ++++---- ...ordSingleFlightLeaderFailureException.java | 19 -- .../service/WordSingleFlightProperties.java | 2 - .../WordSingleFlightRedisCoordinator.java | 183 ++++++---------- src/main/resources/application.properties | 3 +- .../api/word/service/WordServiceTest.java | 80 +++++-- ...FlightRedisCoordinatorIntegrationTest.java | 110 ++++++---- .../WordSingleFlightRedisCoordinatorTest.java | 203 ++++++++---------- 8 files changed, 345 insertions(+), 348 deletions(-) delete mode 100644 src/main/java/com/linglevel/api/word/service/WordSingleFlightLeaderFailureException.java diff --git a/src/main/java/com/linglevel/api/word/service/WordService.java b/src/main/java/com/linglevel/api/word/service/WordService.java index 2b439280..70309579 100644 --- a/src/main/java/com/linglevel/api/word/service/WordService.java +++ b/src/main/java/com/linglevel/api/word/service/WordService.java @@ -50,27 +50,28 @@ public WordSearchResponse getOrCreateWords(String userId, String word, LanguageC log.info("Word '{}' not found for targetLanguage {}, creating new one...", wordVariant.getOriginalForm(), targetLanguage); - List analysisResults; try { - analysisResults = singleFlightCoordinator.execute( + return singleFlightCoordinator.execute( wordVariant.getOriginalForm(), targetLanguage, - () -> wordAiService.analyzeWord( + () -> { + List analysisResults = wordAiService.analyzeWord( wordVariant.getOriginalForm(), targetLanguage.getCode() + ); + Word newWord = convertAnalysisResultToWord(analysisResults.get(0)); + return wordRepository.save(newWord); + }, + () -> wordRepository.findByWordAndTargetLanguageCode( + wordVariant.getOriginalForm(), + targetLanguage ) ); } catch (WordSingleFlightTimeoutException e) { log.warn("Single-flight temporary failure for originalForm '{}'. Returning timeout error.", wordVariant.getOriginalForm(), e); throw new WordsException(WordsErrorCode.WORD_ANALYSIS_TIMEOUT); - } catch (WordSingleFlightLeaderFailureException e) { - throw mapLeaderFailure(wordVariant.getOriginalForm(), e, false); } - - // Word 생성 및 저장 (빈 결과는 WordAiService에서 예외 발생) - Word newWord = convertAnalysisResultToWord(analysisResults.get(0)); - return wordRepository.save(newWord); }); boolean isBookmarked = wordBookmarkRepository.existsByUserIdAndWord(userId, wordVariant.getOriginalForm()); @@ -102,6 +103,9 @@ public List getOrCreateWordEntities(String word, LanguageCode targe // 2. InvalidWord 캐시 확인 - 3회 유예 후 차단 Optional cachedInvalidWord = invalidWordRepository.findByWord(word); + int invalidAttemptCountBeforeSingleFlight = cachedInvalidWord + .map(InvalidWord::getAttemptCount) + .orElse(0); if (cachedInvalidWord.isPresent()) { InvalidWord invalidWord = cachedInvalidWord.get(); if (invalidWord.getAttemptCount() >= 3) { @@ -114,41 +118,63 @@ public List getOrCreateWordEntities(String word, LanguageCode targe // 3. DB에 없으면 AI 호출 (실패 시에도 InvalidWord로 캐싱) log.info("Word '{}' not found in database. Calling AI to analyze...", word); - List analysisResults; try { - analysisResults = singleFlightCoordinator.execute( + return singleFlightCoordinator.execute( word, targetLanguage, - () -> wordAiService.analyzeWord(word, targetLanguage.getCode()) + () -> { + List analysisResults = wordAiService.analyzeWord(word, targetLanguage.getCode()); + + // AI 호출 성공 시 InvalidWord 캐시에서 제거 (일시적 오류였던 경우 복구) + cachedInvalidWord.ifPresent(invalidWord -> { + invalidWordRepository.delete(invalidWord); + log.info("Removed word '{}' from invalid word cache after successful AI analysis (was attempt {}/3)", + word, invalidWord.getAttemptCount()); + }); + + List savedVariants = new ArrayList<>(); + for (WordAnalysisResult analysisResult : analysisResults) { + WordVariant savedVariant = saveWordFromAnalysis(word, analysisResult); + savedVariants.add(savedVariant); + } + + return savedVariants; + }, + () -> findWordVariantsAfterSingleFlight(word, invalidAttemptCountBeforeSingleFlight) ); - // AI 호출 성공 시 InvalidWord 캐시에서 제거 (일시적 오류였던 경우 복구) - cachedInvalidWord.ifPresent(invalidWord -> { - invalidWordRepository.delete(invalidWord); - log.info("Removed word '{}' from invalid word cache after successful AI analysis (was attempt {}/3)", - word, invalidWord.getAttemptCount()); - }); - } catch (WordSingleFlightTimeoutException e) { log.warn("Single-flight temporary failure for word '{}'. Keeping invalid-word cache untouched.", word, e); throw new WordsException(WordsErrorCode.WORD_ANALYSIS_TIMEOUT); - } catch (WordSingleFlightLeaderFailureException e) { - throw mapLeaderFailure(word, e, true); + } catch (WordsException e) { + throw e; } catch (Exception e) { // AI 호출 실패 또는 무의미한 단어인 경우 InvalidWord로 캐싱 log.warn("AI call failed for word '{}'. Caching as invalid word to prevent retries.", word, e); saveInvalidWord(word); throw new WordsException(WordsErrorCode.WORD_IS_MEANINGLESS); } + } - // 4. 트랜잭션 내에서 DB 저장 처리 - List savedVariants = new ArrayList<>(); - for (WordAnalysisResult analysisResult : analysisResults) { - WordVariant savedVariant = saveWordFromAnalysis(word, analysisResult); - savedVariants.add(savedVariant); + private Optional> findWordVariantsAfterSingleFlight( + String word, + int invalidAttemptCountBeforeSingleFlight + ) { + List existingVariants = wordVariantRepository.findAllByWord(word); + if (!existingVariants.isEmpty()) { + return Optional.of(existingVariants); } - return savedVariants; + Optional currentInvalidWord = invalidWordRepository.findByWord(word); + if (currentInvalidWord.isPresent()) { + int currentAttemptCount = currentInvalidWord.get().getAttemptCount(); + + if (currentAttemptCount >= 3 || currentAttemptCount > invalidAttemptCountBeforeSingleFlight) { + throw new WordsException(WordsErrorCode.WORD_IS_MEANINGLESS); + } + } + + return Optional.empty(); } @@ -352,19 +378,6 @@ private WordResponse convertToResponse(Word word, boolean isBookmarked, List>> channelWaiters = new ConcurrentHashMap<>(); @@ -63,19 +59,20 @@ void shutdown() { } - public List execute( + public T execute( String word, LanguageCode targetLanguage, - Supplier> leaderAction + Supplier leaderAction, + Supplier> followerResultLookup ) { if (!properties.isEnabled()) { return leaderAction.get(); } KeySet keys = buildKeySet(word, targetLanguage); - ResultEnvelope cached = readResult(keys.resultKey()); - if (cached != null) { - return unwrap(cached, keys.digest()); + Optional existing = followerResultLookup.get(); + if (existing.isPresent()) { + return existing.get(); } RLock lock = createLock(keys.lockKey()); @@ -84,25 +81,21 @@ public List execute( return executeAsLeader(keys, lock, leaderAction); } - return waitAsFollower(keys); + return waitAsFollower(keys, followerResultLookup); } - private List executeAsLeader( + private T executeAsLeader( KeySet keys, RLock lock, - Supplier> leaderAction + Supplier leaderAction ) { try { - List result = leaderAction.get(); - writeResult(keys.resultKey(), ResultEnvelope.success(result)); - publishDone(keys.channel()); + T result = leaderAction.get(); + completeLeaderAfterCommit(keys, lock); return result; - } catch (RuntimeException e) { - writeResult(keys.resultKey(), ResultEnvelope.failed(e.getMessage(), resolveLeaderErrorCode(e))); - publishDone(keys.channel()); + } catch (RuntimeException | Error e) { + completeLeaderAfterCompletion(keys, lock); throw e; - } finally { - releaseLock(lock, keys.lockKey()); } } @@ -117,40 +110,74 @@ private boolean tryAcquireLeaderLock(RLock lock) { } } - private List waitAsFollower(KeySet keys) { - ResultEnvelope current = readResult(keys.resultKey()); - if (current != null) { - return unwrap(current, keys.digest()); - } - + private T waitAsFollower(KeySet keys, Supplier> followerResultLookup) { CompletableFuture signal = new CompletableFuture<>(); registerWaiter(keys.channel(), signal); try { - ResultEnvelope afterRegister = readResult(keys.resultKey()); - if (afterRegister != null) { - return unwrap(afterRegister, keys.digest()); + Optional afterRegister = followerResultLookup.get(); + if (afterRegister.isPresent()) { + return afterRegister.get(); } signal.get(properties.getWaitTimeoutMs(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { log.warn("Single-flight wait timed out for key digest={}", keys.digest()); - } catch (Exception e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException("Single-flight wait interrupted for key digest=" + keys.digest(), e); + } catch (ExecutionException e) { + throw new RuntimeException("Single-flight wait failed for key digest=" + keys.digest(), e); } finally { unregisterWaiter(keys.channel(), signal); } - ResultEnvelope finalResult = readResult(keys.resultKey()); - if (finalResult != null) { - return unwrap(finalResult, keys.digest()); + Optional finalResult = followerResultLookup.get(); + if (finalResult.isPresent()) { + return finalResult.get(); } throw new WordSingleFlightTimeoutException( - "Timed out waiting single-flight result for key digest=" + keys.digest() + "Timed out waiting single-flight DB result for key digest=" + keys.digest() ); } + private void completeLeaderAfterCommit(KeySet keys, RLock lock) { + if (!TransactionSynchronizationManager.isSynchronizationActive()) { + publishDone(keys.channel()); + releaseLock(lock, keys.lockKey()); + return; + } + + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + publishDone(keys.channel()); + } + + @Override + public void afterCompletion(int status) { + releaseLock(lock, keys.lockKey()); + } + }); + } + + private void completeLeaderAfterCompletion(KeySet keys, RLock lock) { + if (!TransactionSynchronizationManager.isSynchronizationActive()) { + publishDone(keys.channel()); + releaseLock(lock, keys.lockKey()); + return; + } + + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCompletion(int status) { + publishDone(keys.channel()); + releaseLock(lock, keys.lockKey()); + } + }); + } + private void publishDone(String channel) { stringRedisTemplate.convertAndSend(channel, "done"); } @@ -169,69 +196,6 @@ private RLock createLock(String lockKey) { return redissonClient.getLock(lockKey); } - private ResultEnvelope readResult(String resultKey) { - String raw = stringRedisTemplate.opsForValue().get(resultKey); - if (raw == null) { - return null; - } - - try { - return objectMapper.readValue(raw, ResultEnvelope.class); - } catch (JsonProcessingException e) { - log.warn("Failed to deserialize single-flight result key={}", resultKey, e); - return null; - } - } - - private void writeResult(String resultKey, ResultEnvelope envelope) { - try { - String raw = objectMapper.writeValueAsString(envelope); - stringRedisTemplate.opsForValue().set( - resultKey, - raw, - Duration.ofMillis(properties.getResultTtlMs()) - ); - } catch (JsonProcessingException e) { - throw new RuntimeException("Failed to serialize single-flight result", e); - } - } - - private List unwrap(ResultEnvelope envelope, String digest) { - if (envelope.success()) { - return envelope.results(); - } - - WordsErrorCode leaderErrorCode = parseLeaderErrorCode(envelope.errorCode()); - throw new WordSingleFlightLeaderFailureException( - "Single-flight leader failed for key digest=" + digest + ": " + envelope.errorMessage(), - leaderErrorCode - ); - } - - private String resolveLeaderErrorCode(Throwable throwable) { - Throwable cursor = throwable; - while (cursor != null) { - if (cursor instanceof WordsException wordsException && wordsException.getErrorCode() != null) { - return wordsException.getErrorCode().name(); - } - cursor = cursor.getCause(); - } - return null; - } - - private WordsErrorCode parseLeaderErrorCode(String rawErrorCode) { - if (rawErrorCode == null || rawErrorCode.isBlank()) { - return null; - } - - try { - return WordsErrorCode.valueOf(rawErrorCode); - } catch (IllegalArgumentException e) { - log.warn("Unknown single-flight leader error code: {}", rawErrorCode); - return null; - } - } - private void registerWaiter(String channel, CompletableFuture signal) { channelWaiters.compute(channel, (key, waiters) -> { CopyOnWriteArrayList> values = waiters == null @@ -274,7 +238,6 @@ private KeySet buildKeySet(String word, LanguageCode targetLanguage) { return new KeySet( LOCK_PREFIX + ":" + suffix, - RESULT_PREFIX + ":" + suffix, DONE_PREFIX + ":" + suffix, digest ); @@ -292,23 +255,7 @@ private String sha256(String value) { private record KeySet( String lockKey, - String resultKey, String channel, String digest ) { } - - private record ResultEnvelope( - boolean success, - List results, - String errorMessage, - String errorCode - ) { - static ResultEnvelope success(List results) { - return new ResultEnvelope(true, results, null, null); - } - - static ResultEnvelope failed(String errorMessage, String errorCode) { - return new ResultEnvelope(false, List.of(), errorMessage, errorCode); - } - } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index e86fd1f7..eb5d98c1 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -51,10 +51,9 @@ firebase.config=${FIREBASE_CONFIG_BASE64} # Redis spring.data.redis.ssl.enabled=false -# Word single-flight (Redis lock + Pub/Sub + result key fallback) +# Word single-flight (Redis lock + Pub/Sub + DB lookup fallback) word.single-flight.enabled=true word.single-flight.wait-timeout-ms=11000 -word.single-flight.result-ttl-ms=25000 word.single-flight.result-schema-version=v2 # AWS S3 (AI Input/Output buckets) diff --git a/src/test/java/com/linglevel/api/word/service/WordServiceTest.java b/src/test/java/com/linglevel/api/word/service/WordServiceTest.java index bd729e37..1987aeca 100644 --- a/src/test/java/com/linglevel/api/word/service/WordServiceTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordServiceTest.java @@ -3,6 +3,7 @@ import com.linglevel.api.bookmark.repository.WordBookmarkRepository; import com.linglevel.api.i18n.LanguageCode; import com.linglevel.api.word.dto.*; +import com.linglevel.api.word.entity.InvalidWord; import com.linglevel.api.word.entity.Word; import com.linglevel.api.word.entity.WordVariant; import com.linglevel.api.word.exception.WordsException; @@ -60,10 +61,15 @@ class WordServiceTest { @BeforeEach void setUp() { - lenient().when(singleFlightCoordinator.execute(anyString(), any(LanguageCode.class), any())) + lenient().when(singleFlightCoordinator.execute(anyString(), any(LanguageCode.class), any(), any())) .thenAnswer(invocation -> { - @SuppressWarnings("unchecked") - Supplier> supplier = invocation.getArgument(2); + Supplier> lookup = invocation.getArgument(3); + Optional existing = lookup.get(); + if (existing.isPresent()) { + return existing.get(); + } + + Supplier supplier = invocation.getArgument(2); return supplier.get(); }); @@ -267,7 +273,7 @@ void getOrCreateWords_singleFlightTimeout_doesNotCacheInvalidWord() { when(wordVariantRepository.findAllByWord(word)).thenReturn(List.of()); when(invalidWordRepository.findByWord(word)).thenReturn(Optional.empty()); - when(singleFlightCoordinator.execute(eq(word), eq(LanguageCode.KO), any())) + when(singleFlightCoordinator.execute(eq(word), eq(LanguageCode.KO), any(), any())) .thenThrow(new WordSingleFlightTimeoutException("Timed out waiting single-flight result")); assertThatThrownBy(() -> wordService.getOrCreateWords(userId, word, LanguageCode.KO)) @@ -292,7 +298,7 @@ void getOrCreateWords_translationMissTimeout_returnsDomainTimeoutError() { when(wordVariantRepository.findAllByWord(inputWord)).thenReturn(List.of(wordVariant)); when(wordRepository.findByWordAndTargetLanguageCode(originalForm, LanguageCode.KO)) .thenReturn(Optional.empty()); - when(singleFlightCoordinator.execute(eq(originalForm), eq(LanguageCode.KO), any())) + when(singleFlightCoordinator.execute(eq(originalForm), eq(LanguageCode.KO), any(), any())) .thenThrow(new WordSingleFlightTimeoutException("Timed out waiting single-flight result")); assertThatThrownBy(() -> wordService.getOrCreateWords(userId, inputWord, LanguageCode.KO)) @@ -301,17 +307,14 @@ void getOrCreateWords_translationMissTimeout_returnsDomainTimeoutError() { } @Test - @DisplayName("single-flight leader가 무의미 단어로 실패하면 follower도 동일 도메인 에러를 반환하고 invalid 캐시에 반영") - void getOrCreateWords_singleFlightLeaderFailureMeaningless_mapsToDomainError() { + @DisplayName("single-flight leader 요청에서 AI 실패가 발생하면 invalid 캐시에 반영") + void getOrCreateWords_singleFlightLeaderRuntimeFailure_cachesInvalidWord() { String word = "resilience"; when(wordVariantRepository.findAllByWord(word)).thenReturn(List.of()); when(invalidWordRepository.findByWord(word)).thenReturn(Optional.empty()); - when(singleFlightCoordinator.execute(eq(word), eq(LanguageCode.KO), any())) - .thenThrow(new WordSingleFlightLeaderFailureException( - "Single-flight leader failed", - com.linglevel.api.word.exception.WordsErrorCode.WORD_IS_MEANINGLESS - )); + when(singleFlightCoordinator.execute(eq(word), eq(LanguageCode.KO), any(), any())) + .thenThrow(new RuntimeException("bedrock failure")); assertThatThrownBy(() -> wordService.getOrCreateWords(userId, word, LanguageCode.KO)) .isInstanceOf(WordsException.class) @@ -321,8 +324,50 @@ void getOrCreateWords_singleFlightLeaderFailureMeaningless_mapsToDomainError() { } @Test - @DisplayName("translation-miss 경로의 leader 실패가 무의미 단어면 동일 도메인 에러를 반환") - void getOrCreateWords_translationMissLeaderFailure_mapsToDomainMeaninglessError() { + @DisplayName("3회 미만 invalid 캐시는 single-flight 사전 조회에서도 재시도를 허용") + void getOrCreateWordEntities_cachedInvalidBelowThreshold_allowsRetryThroughSingleFlightLookup() { + String word = "resilience"; + InvalidWord cachedInvalidWord = InvalidWord.builder() + .word(word) + .attemptCount(1) + .build(); + + WordAnalysisResult analysisResult = WordAnalysisResult.builder() + .originalForm(word) + .variantTypes(List.of(VariantType.ORIGINAL_FORM)) + .sourceLanguageCode(LanguageCode.EN) + .targetLanguageCode(LanguageCode.KO) + .summary(List.of("회복력")) + .meanings(List.of()) + .build(); + + Word savedWord = Word.builder() + .word(word) + .sourceLanguageCode(LanguageCode.EN) + .targetLanguageCode(LanguageCode.KO) + .summary(List.of("회복력")) + .meanings(List.of()) + .build(); + + when(wordVariantRepository.findAllByWord(word)).thenReturn(List.of()); + when(invalidWordRepository.findByWord(word)).thenReturn(Optional.of(cachedInvalidWord)); + when(wordAiService.analyzeWord(word, LanguageCode.KO.getCode())).thenReturn(List.of(analysisResult)); + when(wordRepository.findByWordAndSourceLanguageCodeAndTargetLanguageCode(word, LanguageCode.EN, LanguageCode.KO)) + .thenReturn(Optional.empty()); + when(wordRepository.save(any(Word.class))).thenReturn(savedWord); + when(wordVariantRepository.findByWordAndOriginalForm(word, word)).thenReturn(Optional.empty()); + + List variants = wordService.getOrCreateWordEntities(word, LanguageCode.KO); + + assertThat(variants).hasSize(1); + assertThat(variants.get(0).getOriginalForm()).isEqualTo(word); + verify(wordAiService).analyzeWord(word, LanguageCode.KO.getCode()); + verify(invalidWordRepository).delete(cachedInvalidWord); + } + + @Test + @DisplayName("translation-miss 경로에서 follower DB 조회가 도메인 에러를 반환하면 그대로 전파") + void getOrCreateWords_translationMissFollowerLookupFailure_propagatesDomainError() { String inputWord = "ran"; String originalForm = "run"; @@ -335,11 +380,8 @@ void getOrCreateWords_translationMissLeaderFailure_mapsToDomainMeaninglessError( when(wordVariantRepository.findAllByWord(inputWord)).thenReturn(List.of(wordVariant)); when(wordRepository.findByWordAndTargetLanguageCode(originalForm, LanguageCode.KO)) .thenReturn(Optional.empty()); - when(singleFlightCoordinator.execute(eq(originalForm), eq(LanguageCode.KO), any())) - .thenThrow(new WordSingleFlightLeaderFailureException( - "Single-flight leader failed", - com.linglevel.api.word.exception.WordsErrorCode.WORD_IS_MEANINGLESS - )); + when(singleFlightCoordinator.execute(eq(originalForm), eq(LanguageCode.KO), any(), any())) + .thenThrow(new WordsException(com.linglevel.api.word.exception.WordsErrorCode.WORD_IS_MEANINGLESS)); assertThatThrownBy(() -> wordService.getOrCreateWords(userId, inputWord, LanguageCode.KO)) .isInstanceOf(WordsException.class) diff --git a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java index 79621b0a..e3698e50 100644 --- a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java @@ -1,18 +1,17 @@ package com.linglevel.api.word.service; -import com.fasterxml.jackson.databind.ObjectMapper; import com.linglevel.api.common.AbstractRedisTest; import com.linglevel.api.i18n.LanguageCode; import com.linglevel.api.word.dto.WordAnalysisResult; -import org.redisson.Redisson; -import org.redisson.api.RedissonClient; -import org.redisson.config.Config; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; @@ -20,12 +19,14 @@ import org.testcontainers.containers.GenericContainer; import java.util.List; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -53,28 +54,43 @@ void tearDown() { } @Test - @DisplayName("실제 Redis에서 두 인스턴스 동시 요청 시 AI 호출은 1회만 수행된다") + @DisplayName("실제 Redis에서 두 인스턴스 동시 요청 시 AI 호출은 1회만 수행되고 follower는 조회 결과를 반환한다") void deduplicatesAcrossTwoCoordinatorsUsingRealRedis() throws Exception { AtomicInteger aiCalls = new AtomicInteger(); + AtomicReference> stored = new AtomicReference<>(); ExecutorService executor = Executors.newFixedThreadPool(2); CountDownLatch start = new CountDownLatch(1); try { Future> f1 = executor.submit(() -> { start.await(1, TimeUnit.SECONDS); - return nodeA.coordinator.execute("run", LanguageCode.KO, () -> { - aiCalls.incrementAndGet(); - sleep(250); - return List.of(sample("run")); - }); + return nodeA.coordinator.execute( + "run", + LanguageCode.KO, + () -> { + aiCalls.incrementAndGet(); + sleep(250); + List result = List.of(sample("run")); + stored.set(result); + return result; + }, + () -> Optional.ofNullable(stored.get()) + ); }); Future> f2 = executor.submit(() -> { start.await(1, TimeUnit.SECONDS); - return nodeB.coordinator.execute("run", LanguageCode.KO, () -> { - aiCalls.incrementAndGet(); - return List.of(sample("run")); - }); + return nodeB.coordinator.execute( + "run", + LanguageCode.KO, + () -> { + aiCalls.incrementAndGet(); + List result = List.of(sample("run")); + stored.set(result); + return result; + }, + () -> Optional.ofNullable(stored.get()) + ); }); start.countDown(); @@ -93,28 +109,50 @@ void deduplicatesAcrossTwoCoordinatorsUsingRealRedis() throws Exception { } @Test - @DisplayName("leader 실패는 실제 Redis resultKey를 통해 follower에도 동일 전파된다") - void propagatesLeaderFailureAcrossTwoCoordinatorsUsingRealRedis() { + @DisplayName("leader 실패 후 저장 결과가 없으면 follower는 timeout으로 실패한다") + void followerTimesOutWhenLeaderFailsWithoutStoredResultUsingRealRedis() throws Exception { RuntimeException leaderFailure = new RuntimeException("bedrock unavailable"); AtomicInteger aiCalls = new AtomicInteger(); + ExecutorService executor = Executors.newFixedThreadPool(2); + CountDownLatch leaderEntered = new CountDownLatch(1); - assertThatThrownBy(() -> - nodeA.coordinator.execute("left", LanguageCode.KO, () -> { - aiCalls.incrementAndGet(); - throw leaderFailure; - }) - ).isInstanceOf(RuntimeException.class) - .hasMessageContaining("bedrock unavailable"); - - assertThatThrownBy(() -> - nodeB.coordinator.execute("left", LanguageCode.KO, () -> { - aiCalls.incrementAndGet(); - return List.of(sample("left")); - }) - ).isInstanceOf(RuntimeException.class) - .hasMessageContaining("Single-flight leader failed"); - - assertThat(aiCalls.get()).isEqualTo(1); + try { + Future> leader = executor.submit(() -> + nodeA.coordinator.execute( + "left", + LanguageCode.KO, + () -> { + aiCalls.incrementAndGet(); + leaderEntered.countDown(); + sleep(250); + throw leaderFailure; + }, + Optional::empty + ) + ); + + assertThat(leaderEntered.await(2, TimeUnit.SECONDS)).isTrue(); + + Future> follower = executor.submit(() -> + nodeB.coordinator.execute( + "left", + LanguageCode.KO, + () -> { + aiCalls.incrementAndGet(); + return List.of(sample("left")); + }, + Optional::empty + ) + ); + + assertThatThrownBy(() -> leader.get(5, TimeUnit.SECONDS)) + .hasCause(leaderFailure); + assertThatThrownBy(() -> follower.get(5, TimeUnit.SECONDS)) + .hasCauseInstanceOf(WordSingleFlightTimeoutException.class); + assertThat(aiCalls.get()).isEqualTo(1); + } finally { + executor.shutdownNow(); + } } private CoordinatorFixture createNode(long waitTimeoutMs) { @@ -141,15 +179,13 @@ private CoordinatorFixture createNode(long waitTimeoutMs) { WordSingleFlightProperties properties = new WordSingleFlightProperties(); properties.setEnabled(true); properties.setWaitTimeoutMs(waitTimeoutMs); - properties.setResultTtlMs(30_000); properties.setResultSchemaVersion("v2"); WordSingleFlightRedisCoordinator coordinator = new WordSingleFlightRedisCoordinator( template, listenerContainer, redissonClient, - properties, - new ObjectMapper() + properties ); ReflectionTestUtils.invokeMethod(coordinator, "initialize"); diff --git a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java index 7f7e18d1..a8828278 100644 --- a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java @@ -1,12 +1,7 @@ package com.linglevel.api.word.service; -import com.fasterxml.jackson.databind.ObjectMapper; import com.linglevel.api.i18n.LanguageCode; import com.linglevel.api.word.dto.WordAnalysisResult; -import com.linglevel.api.word.exception.WordsErrorCode; -import com.linglevel.api.word.exception.WordsException; -import org.redisson.api.RLock; -import org.redisson.api.RedissonClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -15,29 +10,26 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.data.redis.core.ValueOperations; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.test.util.ReflectionTestUtils; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; +import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -57,12 +49,6 @@ class WordSingleFlightRedisCoordinatorTest { @Mock private RLock redissonLock; - @Mock - private ValueOperations valueOperations; - - private final ObjectMapper objectMapper = new ObjectMapper(); - private final Map redisStore = new ConcurrentHashMap<>(); - private WordSingleFlightProperties properties; private WordSingleFlightRedisCoordinator coordinator; @@ -71,44 +57,28 @@ void setUp() { properties = new WordSingleFlightProperties(); properties.setEnabled(true); properties.setWaitTimeoutMs(120); - properties.setResultTtlMs(2_000); properties.setResultSchemaVersion("v2"); - when(stringRedisTemplate.opsForValue()).thenReturn(valueOperations); when(redissonClient.getLock(anyString())).thenReturn(redissonLock); when(redissonLock.isHeldByCurrentThread()).thenReturn(true); - doAnswer(invocation -> redisStore.get(invocation.getArgument(0))) - .when(valueOperations).get(anyString()); - - doAnswer(invocation -> { - String key = invocation.getArgument(0); - String value = invocation.getArgument(1); - redisStore.put(key, value); - return null; - }).when(valueOperations).set(anyString(), anyString(), any(Duration.class)); - coordinator = new WordSingleFlightRedisCoordinator( stringRedisTemplate, redisMessageListenerContainer, redissonClient, - properties, - objectMapper + properties ); ReflectionTestUtils.invokeMethod(coordinator, "initialize"); } @Test - @DisplayName("동일 키 동시 요청은 leader action을 한 번만 실행한다") + @DisplayName("동일 키 동시 요청은 leader action을 한 번만 실행하고 follower는 조회 함수 결과를 반환한다") void execute_deduplicatesConcurrentRequests() throws Exception { stubTryLock(true, false); AtomicInteger aiCalls = new AtomicInteger(); - WordAnalysisResult sample = WordAnalysisResult.builder() - .originalForm("run") - .targetLanguageCode(LanguageCode.KO) - .sourceLanguageCode(LanguageCode.EN) - .build(); + AtomicReference> stored = new AtomicReference<>(); + WordAnalysisResult sample = sample("run"); CountDownLatch start = new CountDownLatch(1); ExecutorService executor = Executors.newFixedThreadPool(2); @@ -116,19 +86,33 @@ void execute_deduplicatesConcurrentRequests() throws Exception { try { Future> f1 = executor.submit(() -> { start.await(1, TimeUnit.SECONDS); - return coordinator.execute("run", LanguageCode.KO, () -> { - aiCalls.incrementAndGet(); - sleep(50); - return List.of(sample); - }); + return coordinator.execute( + "run", + LanguageCode.KO, + () -> { + aiCalls.incrementAndGet(); + sleep(50); + List result = List.of(sample); + stored.set(result); + return result; + }, + () -> Optional.ofNullable(stored.get()) + ); }); Future> f2 = executor.submit(() -> { start.await(1, TimeUnit.SECONDS); - return coordinator.execute("run", LanguageCode.KO, () -> { - aiCalls.incrementAndGet(); - return List.of(sample); - }); + return coordinator.execute( + "run", + LanguageCode.KO, + () -> { + aiCalls.incrementAndGet(); + List result = List.of(sample); + stored.set(result); + return result; + }, + () -> Optional.ofNullable(stored.get()) + ); }); start.countDown(); @@ -145,87 +129,96 @@ void execute_deduplicatesConcurrentRequests() throws Exception { } @Test - @DisplayName("알림 유실 상황에서도 timeout 이후 resultKey 재조회로 결과를 반환한다") - void execute_fallbacksToResultKeyAfterTimeout() { + @DisplayName("알림 유실 상황에서도 timeout 이후 조회 함수로 DB 결과를 반환한다") + void execute_fallbacksToLookupAfterTimeout() { stubTryLock(false); - redisStore.clear(); - - WordAnalysisResult sample = WordAnalysisResult.builder() - .originalForm("book") - .targetLanguageCode(LanguageCode.KO) - .sourceLanguageCode(LanguageCode.EN) - .build(); - - String serialized = toSuccessEnvelopeJson(sample); - AtomicInteger getCalls = new AtomicInteger(); - - doAnswer(invocation -> { - // execute() 내부 readResult 호출 순서: - // 1) 캐시 확인 -> null - // 2) follower 진입 후 pre-check -> null - // 3) register 후 post-check -> null - // 4) timeout 후 final-check -> success - int n = getCalls.incrementAndGet(); - if (n < 4) { - return null; - } - return serialized; - }).when(valueOperations).get(anyString()); + WordAnalysisResult sample = sample("book"); + AtomicInteger lookupCalls = new AtomicInteger(); List result = coordinator.execute( "book", LanguageCode.KO, () -> { throw new IllegalStateException("follower path should not run leader action"); + }, + () -> { + int n = lookupCalls.incrementAndGet(); + if (n < 3) { + return Optional.empty(); + } + return Optional.of(List.of(sample)); } ); assertThat(result).hasSize(1); assertThat(result.get(0).getOriginalForm()).isEqualTo("book"); - assertThat(getCalls.get()).isGreaterThanOrEqualTo(4); + assertThat(lookupCalls.get()).isGreaterThanOrEqualTo(3); } @Test - @DisplayName("leader 실패 결과는 같은 키 요청에 동일하게 전파된다") - void execute_propagatesLeaderFailure() { + @DisplayName("leader 실패 후 DB 결과가 없으면 follower는 timeout으로 실패한다") + void execute_followerTimesOutWhenLeaderFailsWithoutStoredResult() { stubTryLock(true, false); RuntimeException failure = new RuntimeException("bedrock failure"); assertThatThrownBy(() -> - coordinator.execute("left", LanguageCode.KO, () -> { - throw failure; - }) - ).isInstanceOf(RuntimeException.class) - .hasMessageContaining("bedrock failure"); + coordinator.execute( + "left", + LanguageCode.KO, + () -> { + throw failure; + }, + Optional::empty + ) + ).isSameAs(failure); assertThatThrownBy(() -> - coordinator.execute("left", LanguageCode.KO, ArrayList::new) - ).isInstanceOf(RuntimeException.class) - .hasMessageContaining("Single-flight leader failed"); + coordinator.execute( + "left", + LanguageCode.KO, + () -> { + throw new IllegalStateException("follower path should not run leader action"); + }, + Optional::empty + ) + ).isInstanceOf(WordSingleFlightTimeoutException.class) + .hasMessageContaining("Timed out waiting single-flight DB result"); } @Test - @DisplayName("leader의 WordsErrorCode는 follower leader-failure 예외로 전달된다") - void execute_propagatesLeaderDomainErrorCode() { - stubTryLock(true, false); - - RuntimeException failure = new RuntimeException("wrapped", new WordsException(WordsErrorCode.WORD_IS_MEANINGLESS)); + @DisplayName("follower 조회 함수의 예외는 그대로 전파된다") + void execute_propagatesFollowerLookupException() { + stubTryLock(false); - assertThatThrownBy(() -> - coordinator.execute("typooo", LanguageCode.KO, () -> { - throw failure; - }) - ).isInstanceOf(RuntimeException.class); + AtomicInteger lookupCalls = new AtomicInteger(); + IllegalStateException failure = new IllegalStateException("db lookup failed"); assertThatThrownBy(() -> - coordinator.execute("typooo", LanguageCode.KO, ArrayList::new) - ).isInstanceOf(WordSingleFlightLeaderFailureException.class) - .satisfies(ex -> assertThat(((WordSingleFlightLeaderFailureException) ex).getLeaderErrorCode()) - .isEqualTo(WordsErrorCode.WORD_IS_MEANINGLESS)); + coordinator.execute( + "typooo", + LanguageCode.KO, + () -> { + throw new IllegalStateException("follower path should not run leader action"); + }, + () -> { + if (lookupCalls.incrementAndGet() == 1) { + return Optional.empty(); + } + throw failure; + } + ) + ).isSameAs(failure); } + private WordAnalysisResult sample(String originalForm) { + return WordAnalysisResult.builder() + .originalForm(originalForm) + .sourceLanguageCode(LanguageCode.EN) + .targetLanguageCode(LanguageCode.KO) + .build(); + } private void sleep(long millis) { try { Thread.sleep(millis); @@ -235,18 +228,6 @@ private void sleep(long millis) { } } - private String toSuccessEnvelopeJson(WordAnalysisResult result) { - try { - Map payload = new HashMap<>(); - payload.put("success", true); - payload.put("results", List.of(result)); - payload.put("errorMessage", null); - return objectMapper.writeValueAsString(payload); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - private void stubTryLock(boolean first, boolean... others) { Boolean[] sequence = new Boolean[others.length + 1]; sequence[0] = first; @@ -256,7 +237,7 @@ private void stubTryLock(boolean first, boolean... others) { try { when(redissonLock.tryLock(0, TimeUnit.MILLISECONDS)) - .thenReturn(sequence[0], java.util.Arrays.copyOfRange(sequence, 1, sequence.length)); + .thenReturn(sequence[0], Arrays.copyOfRange(sequence, 1, sequence.length)); } catch (InterruptedException e) { throw new RuntimeException(e); } From 40a92cc6fbb3714a51b64c704a2f218d7d24b5e3 Mon Sep 17 00:00:00 2001 From: solfe Date: Mon, 15 Jun 2026 23:41:39 +0900 Subject: [PATCH 3/8] fix(word): harden single-flight completion --- .../api/word/service/WordService.java | 42 ++++++++++++------- .../WordSingleFlightRedisCoordinator.java | 31 +++++++++----- .../api/word/service/WordServiceTest.java | 17 ++++++++ .../WordSingleFlightRedisCoordinatorTest.java | 21 ++++++++++ 4 files changed, 85 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/linglevel/api/word/service/WordService.java b/src/main/java/com/linglevel/api/word/service/WordService.java index 70309579..0f883777 100644 --- a/src/main/java/com/linglevel/api/word/service/WordService.java +++ b/src/main/java/com/linglevel/api/word/service/WordService.java @@ -123,22 +123,27 @@ public List getOrCreateWordEntities(String word, LanguageCode targe word, targetLanguage, () -> { - List analysisResults = wordAiService.analyzeWord(word, targetLanguage.getCode()); - - // AI 호출 성공 시 InvalidWord 캐시에서 제거 (일시적 오류였던 경우 복구) - cachedInvalidWord.ifPresent(invalidWord -> { - invalidWordRepository.delete(invalidWord); - log.info("Removed word '{}' from invalid word cache after successful AI analysis (was attempt {}/3)", - word, invalidWord.getAttemptCount()); - }); - - List savedVariants = new ArrayList<>(); - for (WordAnalysisResult analysisResult : analysisResults) { - WordVariant savedVariant = saveWordFromAnalysis(word, analysisResult); - savedVariants.add(savedVariant); - } + try { + List analysisResults = wordAiService.analyzeWord(word, targetLanguage.getCode()); + + // AI 호출 성공 시 InvalidWord 캐시에서 제거 (일시적 오류였던 경우 복구) + cachedInvalidWord.ifPresent(invalidWord -> { + invalidWordRepository.delete(invalidWord); + log.info("Removed word '{}' from invalid word cache after successful AI analysis (was attempt {}/3)", + word, invalidWord.getAttemptCount()); + }); + + List savedVariants = new ArrayList<>(); + for (WordAnalysisResult analysisResult : analysisResults) { + WordVariant savedVariant = saveWordFromAnalysis(word, analysisResult); + savedVariants.add(savedVariant); + } - return savedVariants; + return savedVariants; + } catch (WordsException e) { + cacheInvalidWordIfMeaningless(word, e); + throw e; + } }, () -> findWordVariantsAfterSingleFlight(word, invalidAttemptCountBeforeSingleFlight) ); @@ -177,6 +182,13 @@ private Optional> findWordVariantsAfterSingleFlight( return Optional.empty(); } + private void cacheInvalidWordIfMeaningless(String word, WordsException e) { + if (e.getErrorCode() == WordsErrorCode.WORD_IS_MEANINGLESS) { + log.warn("AI classified word '{}' as meaningless. Updating invalid-word cache.", word, e); + saveInvalidWord(word); + } + } + @Transactional public WordVariant saveWordFromAnalysis(String word, WordAnalysisResult analysisResult) { diff --git a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java index 838cc4c2..c3822578 100644 --- a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java +++ b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java @@ -89,14 +89,16 @@ private T executeAsLeader( RLock lock, Supplier leaderAction ) { + T result; try { - T result = leaderAction.get(); - completeLeaderAfterCommit(keys, lock); - return result; + result = leaderAction.get(); } catch (RuntimeException | Error e) { completeLeaderAfterCompletion(keys, lock); throw e; } + + completeLeaderAfterCommit(keys, lock); + return result; } private boolean tryAcquireLeaderLock(RLock lock) { @@ -144,40 +146,47 @@ private T waitAsFollower(KeySet keys, Supplier> followerResultLo private void completeLeaderAfterCommit(KeySet keys, RLock lock) { if (!TransactionSynchronizationManager.isSynchronizationActive()) { - publishDone(keys.channel()); - releaseLock(lock, keys.lockKey()); + publishDoneThenRelease(keys, lock); return; } TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { - publishDone(keys.channel()); + publishDoneThenRelease(keys, lock); } @Override public void afterCompletion(int status) { - releaseLock(lock, keys.lockKey()); + if (status != STATUS_COMMITTED) { + releaseLock(lock, keys.lockKey()); + } } }); } private void completeLeaderAfterCompletion(KeySet keys, RLock lock) { if (!TransactionSynchronizationManager.isSynchronizationActive()) { - publishDone(keys.channel()); - releaseLock(lock, keys.lockKey()); + publishDoneThenRelease(keys, lock); return; } TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCompletion(int status) { - publishDone(keys.channel()); - releaseLock(lock, keys.lockKey()); + publishDoneThenRelease(keys, lock); } }); } + private void publishDoneThenRelease(KeySet keys, RLock lock) { + try { + publishDone(keys.channel()); + } finally { + releaseLock(lock, keys.lockKey()); + } + } + private void publishDone(String channel) { stringRedisTemplate.convertAndSend(channel, "done"); } diff --git a/src/test/java/com/linglevel/api/word/service/WordServiceTest.java b/src/test/java/com/linglevel/api/word/service/WordServiceTest.java index 1987aeca..d8491e0f 100644 --- a/src/test/java/com/linglevel/api/word/service/WordServiceTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordServiceTest.java @@ -323,6 +323,23 @@ void getOrCreateWords_singleFlightLeaderRuntimeFailure_cachesInvalidWord() { verify(invalidWordRepository).save(any()); } + @Test + @DisplayName("single-flight leader의 WORD_IS_MEANINGLESS 예외는 invalid 캐시에 반영") + void getOrCreateWords_singleFlightLeaderMeaninglessException_cachesInvalidWord() { + String word = "asdfqwer"; + + when(wordVariantRepository.findAllByWord(word)).thenReturn(List.of()); + when(invalidWordRepository.findByWord(word)).thenReturn(Optional.empty()); + when(wordAiService.analyzeWord(word, LanguageCode.KO.getCode())) + .thenThrow(new WordsException(com.linglevel.api.word.exception.WordsErrorCode.WORD_IS_MEANINGLESS)); + + assertThatThrownBy(() -> wordService.getOrCreateWords(userId, word, LanguageCode.KO)) + .isInstanceOf(WordsException.class) + .hasMessageContaining("meaningless"); + + verify(invalidWordRepository).save(any()); + } + @Test @DisplayName("3회 미만 invalid 캐시는 single-flight 사전 조회에서도 재시도를 허용") void getOrCreateWordEntities_cachedInvalidBelowThreshold_allowsRetryThroughSingleFlightLookup() { diff --git a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java index a8828278..08d70b72 100644 --- a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java @@ -30,6 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -212,6 +213,26 @@ void execute_propagatesFollowerLookupException() { ).isSameAs(failure); } + @Test + @DisplayName("done publish가 실패해도 leader lock은 해제한다") + void execute_releasesLeaderLockWhenPublishFails() { + stubTryLock(true); + + RuntimeException publishFailure = new RuntimeException("redis publish failed"); + doThrow(publishFailure).when(stringRedisTemplate).convertAndSend(anyString(), anyString()); + + assertThatThrownBy(() -> + coordinator.execute( + "run", + LanguageCode.KO, + () -> List.of(sample("run")), + Optional::empty + ) + ).isSameAs(publishFailure); + + verify(redissonLock).unlock(); + } + private WordAnalysisResult sample(String originalForm) { return WordAnalysisResult.builder() .originalForm(originalForm) From 2365cf9955115130c0f965b3215e5175e52126a0 Mon Sep 17 00:00:00 2001 From: solfe Date: Tue, 16 Jun 2026 00:01:15 +0900 Subject: [PATCH 4/8] refactor(word): align single-flight config and errors --- .../WordSingleFlightProperties.java | 2 +- .../api/word/service/WordService.java | 37 +++++++------------ .../WordSingleFlightRedisCoordinator.java | 7 ++-- .../WordSingleFlightTimeoutException.java | 8 ---- .../api/word/service/WordServiceTest.java | 9 +++-- ...FlightRedisCoordinatorIntegrationTest.java | 4 +- .../WordSingleFlightRedisCoordinatorTest.java | 8 +++- 7 files changed, 33 insertions(+), 42 deletions(-) rename src/main/java/com/linglevel/api/word/{service => config}/WordSingleFlightProperties.java (91%) delete mode 100644 src/main/java/com/linglevel/api/word/service/WordSingleFlightTimeoutException.java diff --git a/src/main/java/com/linglevel/api/word/service/WordSingleFlightProperties.java b/src/main/java/com/linglevel/api/word/config/WordSingleFlightProperties.java similarity index 91% rename from src/main/java/com/linglevel/api/word/service/WordSingleFlightProperties.java rename to src/main/java/com/linglevel/api/word/config/WordSingleFlightProperties.java index 47337074..56b4c4bd 100644 --- a/src/main/java/com/linglevel/api/word/service/WordSingleFlightProperties.java +++ b/src/main/java/com/linglevel/api/word/config/WordSingleFlightProperties.java @@ -1,4 +1,4 @@ -package com.linglevel.api.word.service; +package com.linglevel.api.word.config; import lombok.Getter; import lombok.Setter; diff --git a/src/main/java/com/linglevel/api/word/service/WordService.java b/src/main/java/com/linglevel/api/word/service/WordService.java index 0f883777..2e1d7fc8 100644 --- a/src/main/java/com/linglevel/api/word/service/WordService.java +++ b/src/main/java/com/linglevel/api/word/service/WordService.java @@ -50,28 +50,22 @@ public WordSearchResponse getOrCreateWords(String userId, String word, LanguageC log.info("Word '{}' not found for targetLanguage {}, creating new one...", wordVariant.getOriginalForm(), targetLanguage); - try { - return singleFlightCoordinator.execute( - wordVariant.getOriginalForm(), - targetLanguage, - () -> { - List analysisResults = wordAiService.analyzeWord( + return singleFlightCoordinator.execute( + wordVariant.getOriginalForm(), + targetLanguage, + () -> { + List analysisResults = wordAiService.analyzeWord( wordVariant.getOriginalForm(), targetLanguage.getCode() - ); - Word newWord = convertAnalysisResultToWord(analysisResults.get(0)); - return wordRepository.save(newWord); - }, - () -> wordRepository.findByWordAndTargetLanguageCode( - wordVariant.getOriginalForm(), - targetLanguage - ) - ); - } catch (WordSingleFlightTimeoutException e) { - log.warn("Single-flight temporary failure for originalForm '{}'. Returning timeout error.", - wordVariant.getOriginalForm(), e); - throw new WordsException(WordsErrorCode.WORD_ANALYSIS_TIMEOUT); - } + ); + Word newWord = convertAnalysisResultToWord(analysisResults.get(0)); + return wordRepository.save(newWord); + }, + () -> wordRepository.findByWordAndTargetLanguageCode( + wordVariant.getOriginalForm(), + targetLanguage + ) + ); }); boolean isBookmarked = wordBookmarkRepository.existsByUserIdAndWord(userId, wordVariant.getOriginalForm()); @@ -148,9 +142,6 @@ public List getOrCreateWordEntities(String word, LanguageCode targe () -> findWordVariantsAfterSingleFlight(word, invalidAttemptCountBeforeSingleFlight) ); - } catch (WordSingleFlightTimeoutException e) { - log.warn("Single-flight temporary failure for word '{}'. Keeping invalid-word cache untouched.", word, e); - throw new WordsException(WordsErrorCode.WORD_ANALYSIS_TIMEOUT); } catch (WordsException e) { throw e; } catch (Exception e) { diff --git a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java index c3822578..4965736b 100644 --- a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java +++ b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java @@ -1,6 +1,9 @@ package com.linglevel.api.word.service; import com.linglevel.api.i18n.LanguageCode; +import com.linglevel.api.word.config.WordSingleFlightProperties; +import com.linglevel.api.word.exception.WordsErrorCode; +import com.linglevel.api.word.exception.WordsException; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; @@ -139,9 +142,7 @@ private T waitAsFollower(KeySet keys, Supplier> followerResultLo return finalResult.get(); } - throw new WordSingleFlightTimeoutException( - "Timed out waiting single-flight DB result for key digest=" + keys.digest() - ); + throw new WordsException(WordsErrorCode.WORD_ANALYSIS_TIMEOUT); } private void completeLeaderAfterCommit(KeySet keys, RLock lock) { diff --git a/src/main/java/com/linglevel/api/word/service/WordSingleFlightTimeoutException.java b/src/main/java/com/linglevel/api/word/service/WordSingleFlightTimeoutException.java deleted file mode 100644 index cdf98a12..00000000 --- a/src/main/java/com/linglevel/api/word/service/WordSingleFlightTimeoutException.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.linglevel.api.word.service; - -public class WordSingleFlightTimeoutException extends RuntimeException { - - public WordSingleFlightTimeoutException(String message) { - super(message); - } -} diff --git a/src/test/java/com/linglevel/api/word/service/WordServiceTest.java b/src/test/java/com/linglevel/api/word/service/WordServiceTest.java index d8491e0f..831703e7 100644 --- a/src/test/java/com/linglevel/api/word/service/WordServiceTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordServiceTest.java @@ -6,6 +6,7 @@ import com.linglevel.api.word.entity.InvalidWord; import com.linglevel.api.word.entity.Word; import com.linglevel.api.word.entity.WordVariant; +import com.linglevel.api.word.exception.WordsErrorCode; import com.linglevel.api.word.exception.WordsException; import com.linglevel.api.word.repository.InvalidWordRepository; import com.linglevel.api.word.repository.WordRepository; @@ -274,7 +275,7 @@ void getOrCreateWords_singleFlightTimeout_doesNotCacheInvalidWord() { when(wordVariantRepository.findAllByWord(word)).thenReturn(List.of()); when(invalidWordRepository.findByWord(word)).thenReturn(Optional.empty()); when(singleFlightCoordinator.execute(eq(word), eq(LanguageCode.KO), any(), any())) - .thenThrow(new WordSingleFlightTimeoutException("Timed out waiting single-flight result")); + .thenThrow(new WordsException(WordsErrorCode.WORD_ANALYSIS_TIMEOUT)); assertThatThrownBy(() -> wordService.getOrCreateWords(userId, word, LanguageCode.KO)) .isInstanceOf(WordsException.class) @@ -299,7 +300,7 @@ void getOrCreateWords_translationMissTimeout_returnsDomainTimeoutError() { when(wordRepository.findByWordAndTargetLanguageCode(originalForm, LanguageCode.KO)) .thenReturn(Optional.empty()); when(singleFlightCoordinator.execute(eq(originalForm), eq(LanguageCode.KO), any(), any())) - .thenThrow(new WordSingleFlightTimeoutException("Timed out waiting single-flight result")); + .thenThrow(new WordsException(WordsErrorCode.WORD_ANALYSIS_TIMEOUT)); assertThatThrownBy(() -> wordService.getOrCreateWords(userId, inputWord, LanguageCode.KO)) .isInstanceOf(WordsException.class) @@ -331,7 +332,7 @@ void getOrCreateWords_singleFlightLeaderMeaninglessException_cachesInvalidWord() when(wordVariantRepository.findAllByWord(word)).thenReturn(List.of()); when(invalidWordRepository.findByWord(word)).thenReturn(Optional.empty()); when(wordAiService.analyzeWord(word, LanguageCode.KO.getCode())) - .thenThrow(new WordsException(com.linglevel.api.word.exception.WordsErrorCode.WORD_IS_MEANINGLESS)); + .thenThrow(new WordsException(WordsErrorCode.WORD_IS_MEANINGLESS)); assertThatThrownBy(() -> wordService.getOrCreateWords(userId, word, LanguageCode.KO)) .isInstanceOf(WordsException.class) @@ -398,7 +399,7 @@ void getOrCreateWords_translationMissFollowerLookupFailure_propagatesDomainError when(wordRepository.findByWordAndTargetLanguageCode(originalForm, LanguageCode.KO)) .thenReturn(Optional.empty()); when(singleFlightCoordinator.execute(eq(originalForm), eq(LanguageCode.KO), any(), any())) - .thenThrow(new WordsException(com.linglevel.api.word.exception.WordsErrorCode.WORD_IS_MEANINGLESS)); + .thenThrow(new WordsException(WordsErrorCode.WORD_IS_MEANINGLESS)); assertThatThrownBy(() -> wordService.getOrCreateWords(userId, inputWord, LanguageCode.KO)) .isInstanceOf(WordsException.class) diff --git a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java index e3698e50..58fca00f 100644 --- a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorIntegrationTest.java @@ -2,7 +2,9 @@ import com.linglevel.api.common.AbstractRedisTest; import com.linglevel.api.i18n.LanguageCode; +import com.linglevel.api.word.config.WordSingleFlightProperties; import com.linglevel.api.word.dto.WordAnalysisResult; +import com.linglevel.api.word.exception.WordsException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -148,7 +150,7 @@ void followerTimesOutWhenLeaderFailsWithoutStoredResultUsingRealRedis() throws E assertThatThrownBy(() -> leader.get(5, TimeUnit.SECONDS)) .hasCause(leaderFailure); assertThatThrownBy(() -> follower.get(5, TimeUnit.SECONDS)) - .hasCauseInstanceOf(WordSingleFlightTimeoutException.class); + .hasCauseInstanceOf(WordsException.class); assertThat(aiCalls.get()).isEqualTo(1); } finally { executor.shutdownNow(); diff --git a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java index 08d70b72..f0a5fc32 100644 --- a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java @@ -1,7 +1,10 @@ package com.linglevel.api.word.service; import com.linglevel.api.i18n.LanguageCode; +import com.linglevel.api.word.config.WordSingleFlightProperties; import com.linglevel.api.word.dto.WordAnalysisResult; +import com.linglevel.api.word.exception.WordsErrorCode; +import com.linglevel.api.word.exception.WordsException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -184,8 +187,9 @@ void execute_followerTimesOutWhenLeaderFailsWithoutStoredResult() { }, Optional::empty ) - ).isInstanceOf(WordSingleFlightTimeoutException.class) - .hasMessageContaining("Timed out waiting single-flight DB result"); + ).isInstanceOf(WordsException.class) + .satisfies(ex -> assertThat(((WordsException) ex).getErrorCode()) + .isEqualTo(WordsErrorCode.WORD_ANALYSIS_TIMEOUT)); } @Test From ca9c3786bedd4971722fca701133a2614530cdd2 Mon Sep 17 00:00:00 2001 From: solfe Date: Tue, 16 Jun 2026 00:11:08 +0900 Subject: [PATCH 5/8] fix(word): defer single-flight lookup until lock completion --- .../WordSingleFlightRedisCoordinator.java | 44 ++++++++++++++----- .../WordSingleFlightRedisCoordinatorTest.java | 37 ++++++++++++---- 2 files changed, 61 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java index 4965736b..9aa91470 100644 --- a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java +++ b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java @@ -73,18 +73,35 @@ public T execute( } KeySet keys = buildKeySet(word, targetLanguage); - Optional existing = followerResultLookup.get(); - if (existing.isPresent()) { - return existing.get(); - } - RLock lock = createLock(keys.lockKey()); boolean lockAcquired = tryAcquireLeaderLock(lock); if (lockAcquired) { - return executeAsLeader(keys, lock, leaderAction); + return executeWithLeaderLock(keys, lock, leaderAction, followerResultLookup); } - return waitAsFollower(keys, followerResultLookup); + return waitAsFollower(keys, lock, leaderAction, followerResultLookup); + } + + private T executeWithLeaderLock( + KeySet keys, + RLock lock, + Supplier leaderAction, + Supplier> followerResultLookup + ) { + Optional existing; + try { + existing = followerResultLookup.get(); + } catch (RuntimeException | Error e) { + releaseLock(lock, keys.lockKey()); + throw e; + } + + if (existing.isPresent()) { + releaseLock(lock, keys.lockKey()); + return existing.get(); + } + + return executeAsLeader(keys, lock, leaderAction); } private T executeAsLeader( @@ -115,14 +132,19 @@ private boolean tryAcquireLeaderLock(RLock lock) { } } - private T waitAsFollower(KeySet keys, Supplier> followerResultLookup) { + private T waitAsFollower( + KeySet keys, + RLock lock, + Supplier leaderAction, + Supplier> followerResultLookup + ) { CompletableFuture signal = new CompletableFuture<>(); registerWaiter(keys.channel(), signal); try { - Optional afterRegister = followerResultLookup.get(); - if (afterRegister.isPresent()) { - return afterRegister.get(); + boolean lockAcquiredAfterRegister = tryAcquireLeaderLock(lock); + if (lockAcquiredAfterRegister) { + return executeWithLeaderLock(keys, lock, leaderAction, followerResultLookup); } signal.get(properties.getWaitTimeoutMs(), TimeUnit.MILLISECONDS); diff --git a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java index f0a5fc32..1ff45ee1 100644 --- a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java @@ -147,17 +147,14 @@ void execute_fallbacksToLookupAfterTimeout() { throw new IllegalStateException("follower path should not run leader action"); }, () -> { - int n = lookupCalls.incrementAndGet(); - if (n < 3) { - return Optional.empty(); - } + lookupCalls.incrementAndGet(); return Optional.of(List.of(sample)); } ); assertThat(result).hasSize(1); assertThat(result.get(0).getOriginalForm()).isEqualTo("book"); - assertThat(lookupCalls.get()).isGreaterThanOrEqualTo(3); + assertThat(lookupCalls.get()).isEqualTo(1); } @Test @@ -192,12 +189,37 @@ void execute_followerTimesOutWhenLeaderFailsWithoutStoredResult() { .isEqualTo(WordsErrorCode.WORD_ANALYSIS_TIMEOUT)); } + @Test + @DisplayName("follower는 leader lock이 유지되는 동안 DB 조회 함수를 실행하지 않는다") + void execute_doesNotRunFollowerLookupWhileLeaderLockIsHeld() { + stubTryLock(false); + + AtomicInteger lookupCalls = new AtomicInteger(); + + assertThatThrownBy(() -> + coordinator.execute( + "saw", + LanguageCode.KO, + () -> { + throw new IllegalStateException("follower path should not run leader action"); + }, + () -> { + lookupCalls.incrementAndGet(); + return Optional.empty(); + } + ) + ).isInstanceOf(WordsException.class) + .satisfies(ex -> assertThat(((WordsException) ex).getErrorCode()) + .isEqualTo(WordsErrorCode.WORD_ANALYSIS_TIMEOUT)); + + assertThat(lookupCalls.get()).isEqualTo(1); + } + @Test @DisplayName("follower 조회 함수의 예외는 그대로 전파된다") void execute_propagatesFollowerLookupException() { stubTryLock(false); - AtomicInteger lookupCalls = new AtomicInteger(); IllegalStateException failure = new IllegalStateException("db lookup failed"); assertThatThrownBy(() -> @@ -208,9 +230,6 @@ void execute_propagatesFollowerLookupException() { throw new IllegalStateException("follower path should not run leader action"); }, () -> { - if (lookupCalls.incrementAndGet() == 1) { - return Optional.empty(); - } throw failure; } ) From df39184b0a4043b1ed7a1094585aff9a7816f76f Mon Sep 17 00:00:00 2001 From: solfe Date: Tue, 16 Jun 2026 00:17:45 +0900 Subject: [PATCH 6/8] fix(word): limit invalid cache to AI failures --- .../api/word/service/WordService.java | 86 ++++++++++--------- .../api/word/service/WordServiceTest.java | 35 +++++++- 2 files changed, 79 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/linglevel/api/word/service/WordService.java b/src/main/java/com/linglevel/api/word/service/WordService.java index 2e1d7fc8..79f1d815 100644 --- a/src/main/java/com/linglevel/api/word/service/WordService.java +++ b/src/main/java/com/linglevel/api/word/service/WordService.java @@ -110,46 +110,28 @@ public List getOrCreateWordEntities(String word, LanguageCode targe word, invalidWord.getAttemptCount(), invalidWord.getAttemptCount() + 1); } - // 3. DB에 없으면 AI 호출 (실패 시에도 InvalidWord로 캐싱) + // 3. DB에 없으면 AI 호출 (AI 분석 실패 시에만 InvalidWord로 캐싱) log.info("Word '{}' not found in database. Calling AI to analyze...", word); - try { - return singleFlightCoordinator.execute( - word, - targetLanguage, - () -> { - try { - List analysisResults = wordAiService.analyzeWord(word, targetLanguage.getCode()); - - // AI 호출 성공 시 InvalidWord 캐시에서 제거 (일시적 오류였던 경우 복구) - cachedInvalidWord.ifPresent(invalidWord -> { - invalidWordRepository.delete(invalidWord); - log.info("Removed word '{}' from invalid word cache after successful AI analysis (was attempt {}/3)", - word, invalidWord.getAttemptCount()); - }); - - List savedVariants = new ArrayList<>(); - for (WordAnalysisResult analysisResult : analysisResults) { - WordVariant savedVariant = saveWordFromAnalysis(word, analysisResult); - savedVariants.add(savedVariant); - } - - return savedVariants; - } catch (WordsException e) { - cacheInvalidWordIfMeaningless(word, e); - throw e; - } - }, - () -> findWordVariantsAfterSingleFlight(word, invalidAttemptCountBeforeSingleFlight) - ); - - } catch (WordsException e) { - throw e; - } catch (Exception e) { - // AI 호출 실패 또는 무의미한 단어인 경우 InvalidWord로 캐싱 - log.warn("AI call failed for word '{}'. Caching as invalid word to prevent retries.", word, e); - saveInvalidWord(word); - throw new WordsException(WordsErrorCode.WORD_IS_MEANINGLESS); - } + return singleFlightCoordinator.execute( + word, + targetLanguage, + () -> { + List analysisResults = analyzeWordAndUpdateInvalidCache( + word, + targetLanguage, + cachedInvalidWord + ); + + List savedVariants = new ArrayList<>(); + for (WordAnalysisResult analysisResult : analysisResults) { + WordVariant savedVariant = saveWordFromAnalysis(word, analysisResult); + savedVariants.add(savedVariant); + } + + return savedVariants; + }, + () -> findWordVariantsAfterSingleFlight(word, invalidAttemptCountBeforeSingleFlight) + ); } private Optional> findWordVariantsAfterSingleFlight( @@ -180,6 +162,32 @@ private void cacheInvalidWordIfMeaningless(String word, WordsException e) { } } + private List analyzeWordAndUpdateInvalidCache( + String word, + LanguageCode targetLanguage, + Optional cachedInvalidWord + ) { + List analysisResults; + try { + analysisResults = wordAiService.analyzeWord(word, targetLanguage.getCode()); + } catch (WordsException e) { + cacheInvalidWordIfMeaningless(word, e); + throw e; + } catch (RuntimeException e) { + log.warn("AI call failed for word '{}'. Caching as invalid word to prevent retries.", word, e); + saveInvalidWord(word); + throw new WordsException(WordsErrorCode.WORD_IS_MEANINGLESS); + } + + cachedInvalidWord.ifPresent(invalidWord -> { + invalidWordRepository.delete(invalidWord); + log.info("Removed word '{}' from invalid word cache after successful AI analysis (was attempt {}/3)", + word, invalidWord.getAttemptCount()); + }); + + return analysisResults; + } + @Transactional public WordVariant saveWordFromAnalysis(String word, WordAnalysisResult analysisResult) { diff --git a/src/test/java/com/linglevel/api/word/service/WordServiceTest.java b/src/test/java/com/linglevel/api/word/service/WordServiceTest.java index 831703e7..9051a12c 100644 --- a/src/test/java/com/linglevel/api/word/service/WordServiceTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordServiceTest.java @@ -18,6 +18,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.dao.DataIntegrityViolationException; import java.util.List; import java.util.Optional; @@ -308,13 +309,13 @@ void getOrCreateWords_translationMissTimeout_returnsDomainTimeoutError() { } @Test - @DisplayName("single-flight leader 요청에서 AI 실패가 발생하면 invalid 캐시에 반영") - void getOrCreateWords_singleFlightLeaderRuntimeFailure_cachesInvalidWord() { + @DisplayName("AI 호출 실패가 발생하면 invalid 캐시에 반영") + void getOrCreateWords_aiRuntimeFailure_cachesInvalidWord() { String word = "resilience"; when(wordVariantRepository.findAllByWord(word)).thenReturn(List.of()); when(invalidWordRepository.findByWord(word)).thenReturn(Optional.empty()); - when(singleFlightCoordinator.execute(eq(word), eq(LanguageCode.KO), any(), any())) + when(wordAiService.analyzeWord(word, LanguageCode.KO.getCode())) .thenThrow(new RuntimeException("bedrock failure")); assertThatThrownBy(() -> wordService.getOrCreateWords(userId, word, LanguageCode.KO)) @@ -324,6 +325,34 @@ void getOrCreateWords_singleFlightLeaderRuntimeFailure_cachesInvalidWord() { verify(invalidWordRepository).save(any()); } + @Test + @DisplayName("DB 저장 실패는 invalid 캐시에 반영하지 않고 그대로 전파") + void getOrCreateWords_persistenceFailure_doesNotCacheInvalidWord() { + String word = "resilience"; + DataIntegrityViolationException failure = new DataIntegrityViolationException("duplicate variant"); + + WordAnalysisResult analysisResult = WordAnalysisResult.builder() + .originalForm(word) + .variantTypes(List.of(VariantType.ORIGINAL_FORM)) + .sourceLanguageCode(LanguageCode.EN) + .targetLanguageCode(LanguageCode.KO) + .summary(List.of("회복력")) + .meanings(List.of()) + .build(); + + when(wordVariantRepository.findAllByWord(word)).thenReturn(List.of()); + when(invalidWordRepository.findByWord(word)).thenReturn(Optional.empty()); + when(wordAiService.analyzeWord(word, LanguageCode.KO.getCode())).thenReturn(List.of(analysisResult)); + when(wordRepository.findByWordAndSourceLanguageCodeAndTargetLanguageCode(word, LanguageCode.EN, LanguageCode.KO)) + .thenReturn(Optional.empty()); + when(wordRepository.save(any(Word.class))).thenThrow(failure); + + assertThatThrownBy(() -> wordService.getOrCreateWords(userId, word, LanguageCode.KO)) + .isSameAs(failure); + + verify(invalidWordRepository, never()).save(any()); + } + @Test @DisplayName("single-flight leader의 WORD_IS_MEANINGLESS 예외는 invalid 캐시에 반영") void getOrCreateWords_singleFlightLeaderMeaninglessException_cachesInvalidWord() { From 9e7e317fb566d96ffa309144645a2792ef810121 Mon Sep 17 00:00:00 2001 From: solfe Date: Tue, 16 Jun 2026 12:08:07 +0900 Subject: [PATCH 7/8] fix(word): notify waiters when result already exists --- .../WordSingleFlightRedisCoordinator.java | 2 +- .../WordSingleFlightRedisCoordinatorTest.java | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java index 9aa91470..11d5b77e 100644 --- a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java +++ b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java @@ -97,7 +97,7 @@ private T executeWithLeaderLock( } if (existing.isPresent()) { - releaseLock(lock, keys.lockKey()); + publishDoneThenRelease(keys, lock); return existing.get(); } diff --git a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java index 1ff45ee1..247744ec 100644 --- a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java @@ -256,6 +256,27 @@ void execute_releasesLeaderLockWhenPublishFails() { verify(redissonLock).unlock(); } + @Test + @DisplayName("lock holder가 기존 결과를 발견하면 대기 중인 follower를 깨우도록 done을 발행한다") + void execute_publishesDoneWhenLockHolderFindsExistingResult() { + stubTryLock(true); + + WordAnalysisResult sample = sample("run"); + + List result = coordinator.execute( + "run", + LanguageCode.KO, + () -> { + throw new IllegalStateException("leader action should not run when result already exists"); + }, + () -> Optional.of(List.of(sample)) + ); + + assertThat(result).hasSize(1); + verify(stringRedisTemplate).convertAndSend(anyString(), anyString()); + verify(redissonLock).unlock(); + } + private WordAnalysisResult sample(String originalForm) { return WordAnalysisResult.builder() .originalForm(originalForm) From 4ec3824d8ead8db75a6943f01e96be73ed6b17ff Mon Sep 17 00:00:00 2001 From: solfe Date: Tue, 16 Jun 2026 12:32:52 +0900 Subject: [PATCH 8/8] fix(word): release single-flight lock before notify --- .../WordSingleFlightRedisCoordinator.java | 19 +++++------ .../WordSingleFlightRedisCoordinatorTest.java | 33 ++++++++++++++++--- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java index 11d5b77e..199dd121 100644 --- a/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java +++ b/src/main/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinator.java @@ -97,7 +97,7 @@ private T executeWithLeaderLock( } if (existing.isPresent()) { - publishDoneThenRelease(keys, lock); + releaseThenPublishDone(keys, lock); return existing.get(); } @@ -169,14 +169,14 @@ private T waitAsFollower( private void completeLeaderAfterCommit(KeySet keys, RLock lock) { if (!TransactionSynchronizationManager.isSynchronizationActive()) { - publishDoneThenRelease(keys, lock); + releaseThenPublishDone(keys, lock); return; } TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { - publishDoneThenRelease(keys, lock); + releaseThenPublishDone(keys, lock); } @Override @@ -190,24 +190,21 @@ public void afterCompletion(int status) { private void completeLeaderAfterCompletion(KeySet keys, RLock lock) { if (!TransactionSynchronizationManager.isSynchronizationActive()) { - publishDoneThenRelease(keys, lock); + releaseThenPublishDone(keys, lock); return; } TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCompletion(int status) { - publishDoneThenRelease(keys, lock); + releaseThenPublishDone(keys, lock); } }); } - private void publishDoneThenRelease(KeySet keys, RLock lock) { - try { - publishDone(keys.channel()); - } finally { - releaseLock(lock, keys.lockKey()); - } + private void releaseThenPublishDone(KeySet keys, RLock lock) { + releaseLock(lock, keys.lockKey()); + publishDone(keys.channel()); } private void publishDone(String channel) { diff --git a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java index 247744ec..a52d2a08 100644 --- a/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java +++ b/src/test/java/com/linglevel/api/word/service/WordSingleFlightRedisCoordinatorTest.java @@ -9,6 +9,7 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -34,6 +35,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -237,8 +239,26 @@ void execute_propagatesFollowerLookupException() { } @Test - @DisplayName("done publish가 실패해도 leader lock은 해제한다") - void execute_releasesLeaderLockWhenPublishFails() { + @DisplayName("leader 완료 시 lock을 해제한 뒤 done을 발행한다") + void execute_releasesLeaderLockBeforePublishingDone() { + stubTryLock(true); + + List result = coordinator.execute( + "run", + LanguageCode.KO, + () -> List.of(sample("run")), + Optional::empty + ); + + assertThat(result).hasSize(1); + InOrder inOrder = inOrder(redissonLock, stringRedisTemplate); + inOrder.verify(redissonLock).unlock(); + inOrder.verify(stringRedisTemplate).convertAndSend(anyString(), anyString()); + } + + @Test + @DisplayName("done publish가 실패해도 leader lock은 먼저 해제되어 있다") + void execute_releasesLeaderLockBeforePublishFailure() { stubTryLock(true); RuntimeException publishFailure = new RuntimeException("redis publish failed"); @@ -253,7 +273,9 @@ void execute_releasesLeaderLockWhenPublishFails() { ) ).isSameAs(publishFailure); - verify(redissonLock).unlock(); + InOrder inOrder = inOrder(redissonLock, stringRedisTemplate); + inOrder.verify(redissonLock).unlock(); + inOrder.verify(stringRedisTemplate).convertAndSend(anyString(), anyString()); } @Test @@ -273,8 +295,9 @@ void execute_publishesDoneWhenLockHolderFindsExistingResult() { ); assertThat(result).hasSize(1); - verify(stringRedisTemplate).convertAndSend(anyString(), anyString()); - verify(redissonLock).unlock(); + InOrder inOrder = inOrder(redissonLock, stringRedisTemplate); + inOrder.verify(redissonLock).unlock(); + inOrder.verify(stringRedisTemplate).convertAndSend(anyString(), anyString()); } private WordAnalysisResult sample(String originalForm) {