diff --git a/src/main/java/com/techfork/domain/post/batch/PostEmbeddingReader.java b/src/main/java/com/techfork/domain/post/batch/PostEmbeddingReader.java index 48dcfed..5dcf7e9 100644 --- a/src/main/java/com/techfork/domain/post/batch/PostEmbeddingReader.java +++ b/src/main/java/com/techfork/domain/post/batch/PostEmbeddingReader.java @@ -1,38 +1,60 @@ -package com.techfork.domain.post.batch; - -import com.techfork.domain.post.entity.Post; -import com.techfork.domain.post.repository.PostRepository; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.item.ItemReader; -import org.springframework.data.domain.Page; -import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Pageable; -import org.springframework.data.domain.Sort; -import org.springframework.stereotype.Component; - -import java.util.Iterator; -import java.util.List; +package com.techfork.domain.post.batch; + +import com.techfork.domain.post.entity.Post; +import com.techfork.domain.post.repository.PostRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.ItemStreamReader; +import org.springframework.stereotype.Component; + +import java.util.Iterator; +import java.util.List; /** * 요약이 완료되고 임베딩이 필요한 Post를 읽어오는 Reader */ -@Slf4j -@Component -@RequiredArgsConstructor -public class PostEmbeddingReader implements ItemReader { - - private final PostRepository postRepository; - private Iterator postIterator; - - @Override - public Post read() { - if(postIterator == null) { - List posts = postRepository.findBySummaryIsNotNullAndEmbeddedAtIsNull(); - log.info("임베딩 대상 Post 개수: {}", posts.size()); - postIterator = posts.iterator(); - } - - return postIterator.hasNext() ? postIterator.next() : null; - } -} +@Slf4j +@Component +@StepScope +@RequiredArgsConstructor +public class PostEmbeddingReader implements ItemStreamReader { + + private final PostRepository postRepository; + private Iterator postIterator; + private boolean initialized; + + @Override + public Post read() { + if (!initialized) { + List posts = postRepository.findReadyForEmbedding(); + log.info("임베딩 대상 Post 개수: {}", posts.size()); + postIterator = posts.iterator(); + initialized = true; + } + + return postIterator.hasNext() ? postIterator.next() : null; + } + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + resetState(); + } + + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { + // no-op + } + + @Override + public void close() throws ItemStreamException { + resetState(); + } + + private void resetState() { + postIterator = null; + initialized = false; + } +} diff --git a/src/main/java/com/techfork/domain/post/repository/PostRepository.java b/src/main/java/com/techfork/domain/post/repository/PostRepository.java index 80cdac4..4078a97 100644 --- a/src/main/java/com/techfork/domain/post/repository/PostRepository.java +++ b/src/main/java/com/techfork/domain/post/repository/PostRepository.java @@ -28,7 +28,13 @@ public interface PostRepository extends JpaRepository { """) List findWithKeywordsBySummaryIsNull(); - List findBySummaryIsNotNullAndEmbeddedAtIsNull(); + @Query(""" + SELECT p FROM Post p + WHERE p.summary IS NOT NULL + AND LENGTH(TRIM(p.summary)) > 0 + AND p.embeddedAt IS NULL + """) + List findReadyForEmbedding(); @Modifying @Query("UPDATE Post p SET p.embeddedAt = :embeddedAt WHERE p.id IN :ids") diff --git a/src/test/java/com/techfork/domain/post/batch/PostEmbeddingProcessorTest.java b/src/test/java/com/techfork/domain/post/batch/PostEmbeddingProcessorTest.java new file mode 100644 index 0000000..9177537 --- /dev/null +++ b/src/test/java/com/techfork/domain/post/batch/PostEmbeddingProcessorTest.java @@ -0,0 +1,168 @@ +package com.techfork.domain.post.batch; + +import com.techfork.domain.post.document.ContentChunk; +import com.techfork.domain.post.document.PostDocument; +import com.techfork.domain.post.entity.Post; +import com.techfork.domain.post.fixture.PostFixture; +import com.techfork.domain.post.service.ContentChunkerService; +import com.techfork.global.llm.EmbeddingClient; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; + +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +@ExtendWith(MockitoExtension.class) +class PostEmbeddingProcessorTest { + + private static final LocalDateTime DEFAULT_PUBLISHED_AT = LocalDateTime.of(2026, 4, 13, 7, 0, 0); + + @Mock + private ContentChunkerService contentChunkerService; + + @Mock + private EmbeddingClient embeddingClient; + + @Nested + @DisplayName("process") + class Process { + + @Test + @DisplayName("제목, 요약, 유효 본문 청크를 임베딩해 PostDocument projection을 생성한다") + void createsPostDocumentProjectionFromEmbeddings() { + PostEmbeddingProcessor postEmbeddingProcessor = createProcessor(); + Post post = createPost(); + List titleEmbedding = List.of(0.1f, 0.2f); + List summaryEmbedding = List.of(0.3f, 0.4f); + List rawChunks = List.of("첫 번째 청크", "", " ", "두 번째 청크"); + List> chunkEmbeddings = List.of( + List.of(1.1f, 1.2f), + List.of(2.1f, 2.2f) + ); + + given(embeddingClient.embed("임베딩 대상 게시글")).willReturn(titleEmbedding); + given(embeddingClient.embed("요약 완료")).willReturn(summaryEmbedding); + given(contentChunkerService.chunkContent("원문 본문")).willReturn(rawChunks); + given(embeddingClient.embedBatch(List.of("첫 번째 청크", "두 번째 청크"))).willReturn(chunkEmbeddings); + + PostDocument result = postEmbeddingProcessor.process(post); + + assertThat(result).isNotNull(); + assertThat(result.getId()).isEqualTo("1"); + assertThat(result.getPostId()).isEqualTo(1L); + assertThat(result.getTitle()).isEqualTo("임베딩 대상 게시글"); + assertThat(result.getSummary()).isEqualTo("요약 완료"); + assertThat(result.getShortSummary()).isEqualTo("짧은 요약"); + assertThat(result.getCompany()).isEqualTo("TechFork"); + assertThat(result.getUrl()).isEqualTo("https://posts.example.com/1"); + assertThat(result.getLogoUrl()).isEqualTo("https://cdn.example.com/logo-1.png"); + assertThat(result.getThumbnailUrl()).isEqualTo("https://cdn.example.com/thumb-1.png"); + assertThat(result.getPublishedAt()).isEqualTo(DEFAULT_PUBLISHED_AT); + assertThat(result.getTitleEmbedding()).containsExactlyElementsOf(titleEmbedding); + assertThat(result.getSummaryEmbedding()).containsExactlyElementsOf(summaryEmbedding); + assertThat(result.getContentChunks()).hasSize(2); + assertThat(result.getContentChunks()) + .extracting(ContentChunk::getChunkOrder, ContentChunk::getChunkText, ContentChunk::getEmbedding) + .containsExactly( + org.assertj.core.groups.Tuple.tuple(0, "첫 번째 청크", List.of(1.1f, 1.2f)), + org.assertj.core.groups.Tuple.tuple(1, "두 번째 청크", List.of(2.1f, 2.2f)) + ); + + verify(embeddingClient).embed("임베딩 대상 게시글"); + verify(embeddingClient).embed("요약 완료"); + verify(contentChunkerService).chunkContent("원문 본문"); + verify(embeddingClient).embedBatch(List.of("첫 번째 청크", "두 번째 청크")); + } + + @Test + @DisplayName("제목이 비어 있으면 임베딩을 스킵하고 null을 반환한다") + void returnsNullWhenTitleIsBlank() { + PostEmbeddingProcessor postEmbeddingProcessor = createProcessor(); + Post post = createPost(); + ReflectionTestUtils.setField(post, "title", " "); + + PostDocument result = postEmbeddingProcessor.process(post); + + assertThat(result).isNull(); + verifyNoInteractions(embeddingClient, contentChunkerService); + } + + @Test + @DisplayName("요약이 비어 있으면 임베딩을 스킵하고 null을 반환한다") + void returnsNullWhenSummaryIsBlank() { + PostEmbeddingProcessor postEmbeddingProcessor = createProcessor(); + Post post = createPost(); + ReflectionTestUtils.setField(post, "summary", " "); + + PostDocument result = postEmbeddingProcessor.process(post); + + assertThat(result).isNull(); + verifyNoInteractions(embeddingClient, contentChunkerService); + } + + @Test + @DisplayName("유효한 본문 청크가 없으면 batch embedding 없이 null을 반환한다") + void returnsNullWhenNoValidChunksRemain() { + PostEmbeddingProcessor postEmbeddingProcessor = createProcessor(); + Post post = createPost(); + given(embeddingClient.embed("임베딩 대상 게시글")).willReturn(List.of(0.1f)); + given(embeddingClient.embed("요약 완료")).willReturn(List.of(0.2f)); + given(contentChunkerService.chunkContent("원문 본문")) + .willReturn(Arrays.asList("", " ", null)); + + PostDocument result = postEmbeddingProcessor.process(post); + + assertThat(result).isNull(); + verify(embeddingClient).embed("임베딩 대상 게시글"); + verify(embeddingClient).embed("요약 완료"); + verify(contentChunkerService).chunkContent("원문 본문"); + verify(embeddingClient, never()).embedBatch(anyList()); + } + + @Test + @DisplayName("임베딩 클라이언트 예외를 그대로 전파한다") + void propagatesEmbeddingClientFailure() { + PostEmbeddingProcessor postEmbeddingProcessor = createProcessor(); + Post post = createPost(); + given(embeddingClient.embed("임베딩 대상 게시글")) + .willThrow(new IllegalStateException("embedding failed")); + + assertThatThrownBy(() -> postEmbeddingProcessor.process(post)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("embedding failed"); + verify(embeddingClient).embed("임베딩 대상 게시글"); + verify(embeddingClient, never()).embed("요약 완료"); + verifyNoInteractions(contentChunkerService); + } + + private PostEmbeddingProcessor createProcessor() { + return new PostEmbeddingProcessor(contentChunkerService, embeddingClient); + } + + private Post createPost() { + return PostFixture.createPost( + 1L, + "임베딩 대상 게시글", + "원문 본문", + "평문 본문", + "TechFork", + "요약 완료", + "짧은 요약" + ); + } + } +} diff --git a/src/test/java/com/techfork/domain/post/batch/PostEmbeddingReaderDataJpaTest.java b/src/test/java/com/techfork/domain/post/batch/PostEmbeddingReaderDataJpaTest.java new file mode 100644 index 0000000..2f5447c --- /dev/null +++ b/src/test/java/com/techfork/domain/post/batch/PostEmbeddingReaderDataJpaTest.java @@ -0,0 +1,123 @@ +package com.techfork.domain.post.batch; + +import com.techfork.domain.post.entity.Post; +import com.techfork.domain.post.repository.PostRepository; +import com.techfork.domain.source.dto.RssFeedItem; +import com.techfork.domain.source.entity.TechBlog; +import com.techfork.domain.source.repository.TechBlogRepository; +import jakarta.persistence.EntityManager; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.util.ReflectionTestUtils; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +@DataJpaTest +@ActiveProfiles("test") +class PostEmbeddingReaderDataJpaTest { + + @Autowired + private PostRepository postRepository; + + @Autowired + private TechBlogRepository techBlogRepository; + + @Autowired + private EntityManager entityManager; + + private TechBlog techBlog; + + @BeforeEach + void setUp() { + techBlog = techBlogRepository.save( + TechBlog.create( + "TechFork", + "https://techfork.example.com", + "https://techfork.example.com/rss", + "https://cdn.example.com/logo.png" + ) + ); + } + + @Nested + @DisplayName("read") + class Read { + + @Test + @DisplayName("요약이 공백이 아니고 embeddedAt이 null인 게시글만 읽는다") + void readsOnlyPostsReadyForEmbedding() throws Exception { + Post readyPost1 = savePost("ready-1", "요약 완료 1", "짧은 요약 1", null); + Post readyPost2 = savePost("ready-2", "요약 완료 2", "짧은 요약 2", null); + savePost("null-summary", null, null, null); + savePost("empty-summary", "", "", null); + savePost("blank-summary", " ", " ", null); + savePost("already-embedded", "이미 임베딩됨", "짧은 요약", LocalDateTime.of(2026, 5, 11, 9, 0)); + + entityManager.clear(); + + PostEmbeddingReader postEmbeddingReader = new PostEmbeddingReader(postRepository); + List readPosts = new ArrayList<>(); + + Post firstRead = postEmbeddingReader.read(); + Post secondRead = postEmbeddingReader.read(); + Post thirdRead = postEmbeddingReader.read(); + + readPosts.add(firstRead); + readPosts.add(secondRead); + + assertThat(thirdRead).isNull(); + assertThat(readPosts) + .extracting(Post::getId) + .containsExactlyInAnyOrder(readyPost1.getId(), readyPost2.getId()); + assertThat(readPosts) + .allSatisfy(post -> { + assertThat(post.getSummary()).isNotBlank(); + assertThat(post.getEmbeddedAt()).isNull(); + }); + } + + @Test + @DisplayName("조건을 만족하는 게시글이 없으면 null을 반환한다") + void returnsNullWhenNoPostsAreReadyForEmbedding() throws Exception { + savePost("null-summary", null, null, null); + savePost("empty-summary", "", "", null); + savePost("blank-summary", " ", " ", null); + savePost("already-embedded", "이미 임베딩됨", "짧은 요약", LocalDateTime.of(2026, 5, 11, 9, 0)); + + entityManager.clear(); + + PostEmbeddingReader postEmbeddingReader = new PostEmbeddingReader(postRepository); + + assertThat(postEmbeddingReader.read()).isNull(); + } + } + + private Post savePost(String suffix, String summary, String shortSummary, LocalDateTime embeddedAt) { + Post post = Post.create( + RssFeedItem.builder() + .title("임베딩 대상 글 " + suffix) + .url("https://posts.example.com/" + suffix) + .logoUrl("https://cdn.example.com/logo-" + suffix + ".png") + .thumbnailUrl("https://cdn.example.com/thumb-" + suffix + ".png") + .content("원문 본문 " + suffix) + .plainContent("평문 본문 " + suffix) + .publishedAt(LocalDateTime.of(2026, 5, 10, 10, 0)) + .company("TechFork") + .techBlogId(techBlog.getId()) + .build(), + techBlog + ); + post.updateSummaries(summary, shortSummary); + ReflectionTestUtils.setField(post, "embeddedAt", embeddedAt); + return postRepository.saveAndFlush(post); + } +} diff --git a/src/test/java/com/techfork/domain/post/batch/PostEmbeddingReaderTest.java b/src/test/java/com/techfork/domain/post/batch/PostEmbeddingReaderTest.java new file mode 100644 index 0000000..0cc3a9d --- /dev/null +++ b/src/test/java/com/techfork/domain/post/batch/PostEmbeddingReaderTest.java @@ -0,0 +1,96 @@ +package com.techfork.domain.post.batch; + +import com.techfork.domain.post.entity.Post; +import com.techfork.domain.post.fixture.PostFixture; +import com.techfork.domain.post.repository.PostRepository; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.batch.item.ExecutionContext; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +@ExtendWith(MockitoExtension.class) +class PostEmbeddingReaderTest { + + @Mock + private PostRepository postRepository; + + @Test + @DisplayName("생성만으로는 repository를 조회하지 않는다") + void doesNotQueryRepositoryOnConstruction() { + new PostEmbeddingReader(postRepository); + + verifyNoInteractions(postRepository); + } + + @Nested + @DisplayName("read") + class Read { + + @Test + @DisplayName("첫 read에서만 repository를 조회하고 Post를 순차적으로 반환한다") + void lazilyLoadsOnceAndReturnsPostsSequentially() { + PostEmbeddingReader postEmbeddingReader = new PostEmbeddingReader(postRepository); + Post firstPost = PostFixture.createPost(1L, "첫 번째 글", "본문1", "평문1", "TechFork", "요약1", "짧은요약1"); + Post secondPost = PostFixture.createPost(2L, "두 번째 글", "본문2", "평문2", "TechFork", "요약2", "짧은요약2"); + given(postRepository.findReadyForEmbedding()).willReturn(List.of(firstPost, secondPost)); + + Post firstRead = postEmbeddingReader.read(); + Post secondRead = postEmbeddingReader.read(); + Post thirdRead = postEmbeddingReader.read(); + + assertThat(firstRead).isSameAs(firstPost); + assertThat(secondRead).isSameAs(secondPost); + assertThat(thirdRead).isNull(); + verify(postRepository, times(1)).findReadyForEmbedding(); + } + + @Test + @DisplayName("조회 결과가 비어 있으면 null을 반환하고 다시 조회하지 않는다") + void returnsNullForEmptyRepositoryResultWithoutReloading() { + PostEmbeddingReader postEmbeddingReader = new PostEmbeddingReader(postRepository); + given(postRepository.findReadyForEmbedding()).willReturn(List.of()); + + Post firstRead = postEmbeddingReader.read(); + Post secondRead = postEmbeddingReader.read(); + + assertThat(firstRead).isNull(); + assertThat(secondRead).isNull(); + verify(postRepository, times(1)).findReadyForEmbedding(); + } + + @Test + @DisplayName("새 step execution이 시작되면 repository를 다시 조회한다") + void reloadsRepositoryWhenNewStepExecutionStarts() { + PostEmbeddingReader postEmbeddingReader = new PostEmbeddingReader(postRepository); + Post firstStepPost = PostFixture.createPost(1L, "첫 실행 글", "본문1", "평문1", "TechFork", "요약1", "짧은요약1"); + Post secondStepPost = PostFixture.createPost(2L, "두 번째 실행 글", "본문2", "평문2", "TechFork", "요약2", "짧은요약2"); + given(postRepository.findReadyForEmbedding()) + .willReturn(List.of(firstStepPost)) + .willReturn(List.of(secondStepPost)); + + postEmbeddingReader.open(new ExecutionContext()); + Post firstExecutionRead = postEmbeddingReader.read(); + Post firstExecutionEnd = postEmbeddingReader.read(); + postEmbeddingReader.close(); + + postEmbeddingReader.open(new ExecutionContext()); + Post secondExecutionRead = postEmbeddingReader.read(); + + assertThat(firstExecutionRead).isSameAs(firstStepPost); + assertThat(firstExecutionEnd).isNull(); + assertThat(secondExecutionRead).isSameAs(secondStepPost); + verify(postRepository, times(2)).findReadyForEmbedding(); + } + } +} diff --git a/src/test/java/com/techfork/domain/post/batch/PostEmbeddingWriterTest.java b/src/test/java/com/techfork/domain/post/batch/PostEmbeddingWriterTest.java new file mode 100644 index 0000000..eb9b92f --- /dev/null +++ b/src/test/java/com/techfork/domain/post/batch/PostEmbeddingWriterTest.java @@ -0,0 +1,196 @@ +package com.techfork.domain.post.batch; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.ErrorCause; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import co.elastic.clients.elasticsearch.core.bulk.OperationType; +import co.elastic.clients.util.ObjectBuilder; +import com.techfork.domain.post.document.ContentChunk; +import com.techfork.domain.post.document.PostDocument; +import com.techfork.domain.post.entity.Post; +import com.techfork.domain.post.fixture.PostFixture; +import com.techfork.domain.post.repository.PostRepository; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class PostEmbeddingWriterTest { + + @Mock + private ElasticsearchClient elasticsearchClient; + + @Mock + private PostRepository postRepository; + + @Nested + @DisplayName("write") + class Write { + + @Test + @DisplayName("빈 chunk면 Elasticsearch와 repository를 호출하지 않는다") + void doesNothingForEmptyChunk() throws Exception { + PostEmbeddingWriter postEmbeddingWriter = createWriter(); + + postEmbeddingWriter.write(org.springframework.batch.item.Chunk.of()); + + verify(elasticsearchClient, never()).bulk(any(Function.class)); + verify(postRepository, never()).bulkUpdateEmbeddedAt(any(), any()); + } + + @Test + @DisplayName("문서를 bulk index 요청으로 보내고 성공한 post id들의 embeddedAt을 갱신한다") + @SuppressWarnings({"rawtypes", "unchecked"}) + void indexesDocumentsAndUpdatesEmbeddedAtForSuccessfulIds() throws Exception { + PostEmbeddingWriter postEmbeddingWriter = createWriter(); + PostDocument firstDocument = createPostDocument(1L); + PostDocument secondDocument = createPostDocument(2L); + BulkResponse bulkResponse = BulkResponse.of(b -> b + .errors(false) + .took(1) + .items( + successItem("1"), + successItem("2") + ) + ); + doReturn(bulkResponse).when(elasticsearchClient).bulk(any(Function.class)); + ArgumentCaptor>> bulkRequestCaptor = + ArgumentCaptor.forClass(Function.class); + ArgumentCaptor> idsCaptor = ArgumentCaptor.forClass(List.class); + ArgumentCaptor embeddedAtCaptor = ArgumentCaptor.forClass(LocalDateTime.class); + + LocalDateTime beforeWrite = LocalDateTime.now(); + postEmbeddingWriter.write(org.springframework.batch.item.Chunk.of(firstDocument, secondDocument)); + LocalDateTime afterWrite = LocalDateTime.now(); + + verify(elasticsearchClient).bulk(bulkRequestCaptor.capture()); + BulkRequest bulkRequest = bulkRequestCaptor.getValue().apply(new BulkRequest.Builder()).build(); + assertThat(bulkRequest.index()).isEqualTo("posts"); + assertThat(bulkRequest.operations()).hasSize(2); + assertThat(bulkRequest.operations()) + .allSatisfy(operation -> assertThat(operation.isIndex()).isTrue()); + assertThat(bulkRequest.operations()) + .extracting(operation -> operation.index().id()) + .containsExactly("1", "2"); + assertThat(bulkRequest.operations()) + .extracting(operation -> (PostDocument) operation.index().document()) + .containsExactly(firstDocument, secondDocument); + + verify(postRepository).bulkUpdateEmbeddedAt(idsCaptor.capture(), embeddedAtCaptor.capture()); + assertThat(idsCaptor.getValue()).containsExactly(1L, 2L); + assertThat(embeddedAtCaptor.getValue()).isBetween(beforeWrite, afterWrite); + } + + @Test + @DisplayName("bulk 일부 실패 시 성공한 post id만 embeddedAt을 갱신한다") + void updatesOnlySuccessfulPostIdsWhenBulkHasFailures() throws Exception { + PostEmbeddingWriter postEmbeddingWriter = createWriter(); + PostDocument firstDocument = createPostDocument(1L); + PostDocument secondDocument = createPostDocument(2L); + BulkResponse bulkResponse = BulkResponse.of(b -> b + .errors(true) + .took(1) + .items( + successItem("1"), + failureItem("2", "mapper parsing failed") + ) + ); + doReturn(bulkResponse).when(elasticsearchClient).bulk(any(Function.class)); + ArgumentCaptor> idsCaptor = ArgumentCaptor.forClass(List.class); + + postEmbeddingWriter.write(org.springframework.batch.item.Chunk.of(firstDocument, secondDocument)); + + verify(postRepository).bulkUpdateEmbeddedAt(idsCaptor.capture(), any(LocalDateTime.class)); + assertThat(idsCaptor.getValue()).containsExactly(1L); + } + + @Test + @DisplayName("bulk 응답이 null이면 embeddedAt을 갱신하지 않는다") + void skipsEmbeddedAtUpdateWhenBulkResponseIsNull() throws Exception { + PostEmbeddingWriter postEmbeddingWriter = createWriter(); + PostDocument document = createPostDocument(1L); + doReturn(null).when(elasticsearchClient).bulk(any(Function.class)); + + postEmbeddingWriter.write(org.springframework.batch.item.Chunk.of(document)); + + verify(postRepository, never()).bulkUpdateEmbeddedAt(any(), any()); + } + + @Test + @DisplayName("성공한 문서가 없으면 embeddedAt을 갱신하지 않는다") + void skipsEmbeddedAtUpdateWhenNoDocumentSucceeded() throws Exception { + PostEmbeddingWriter postEmbeddingWriter = createWriter(); + PostDocument document = createPostDocument(1L); + BulkResponse bulkResponse = BulkResponse.of(b -> b + .errors(true) + .took(1) + .items(failureItem("1", "write failed")) + ); + doReturn(bulkResponse).when(elasticsearchClient).bulk(any(Function.class)); + + postEmbeddingWriter.write(org.springframework.batch.item.Chunk.of(document)); + + verify(postRepository, never()).bulkUpdateEmbeddedAt(any(), any()); + } + + private PostEmbeddingWriter createWriter() { + return new PostEmbeddingWriter(elasticsearchClient, postRepository); + } + + private PostDocument createPostDocument(Long id) { + Post post = PostFixture.createPost( + id, + "임베딩 대상 글 " + id, + "원문 본문 " + id, + "평문 본문 " + id, + "TechFork", + "요약 완료 " + id, + "짧은 요약 " + id + ); + return PostDocument.create( + post, + List.of(0.1f, 0.2f), + List.of(0.3f, 0.4f), + List.of(ContentChunk.create(0, "chunk-" + id, List.of(0.5f, 0.6f))) + ); + } + + private BulkResponseItem successItem(String id) { + return BulkResponseItem.of(item -> item + .id(id) + .index("posts") + .status(201) + .operationType(OperationType.Index) + ); + } + + private BulkResponseItem failureItem(String id, String reason) { + return BulkResponseItem.of(item -> item + .id(id) + .index("posts") + .status(400) + .operationType(OperationType.Index) + .error(ErrorCause.of(error -> error + .type("mapper_parsing_exception") + .reason(reason) + )) + ); + } + } +}