Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/queue-413-private-batch-limits.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"posthog": patch
"posthog-android": patch
---

Stop mutating user-supplied `PostHogConfig.maxBatchSize` and `PostHogConfig.flushAt` when the events queue adapts to HTTP 413 responses. The adaptive cap is now kept in private queue state, halved from the actual batch size that triggered the 413, and `flushAt` is clamped to the cap so a partial-batch 413 can't leave the queue buffering more events than a single batch can drain.
55 changes: 42 additions & 13 deletions posthog/src/main/java/com/posthog/internal/PostHogQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import java.util.TimerTask
import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.schedule
import kotlin.math.max
import kotlin.math.min
import kotlin.math.pow

Expand All @@ -36,6 +35,7 @@ internal class PostHogQueue(
private val timerLock = Any()
private var pausedUntil: Date? = null
private var retryCount = 0
private val batchLimits = initialBatchLimits(config)
private val initialRetryDelaySeconds = 1
private val maxRetryDelaySeconds = 30

Expand Down Expand Up @@ -147,7 +147,7 @@ internal class PostHogQueue(
}

private fun flushIfOverThreshold(isFatal: Boolean) {
if (isAboveThreshold(config.flushAt)) {
if (isAboveThreshold(batchLimits.flushAt)) {
flushBatch(isFatal)
}
}
Expand All @@ -174,7 +174,7 @@ internal class PostHogQueue(
private fun takeFiles(): List<File> {
val events: List<File>
synchronized(dequeLock) {
events = deque.take(config.maxBatchSize)
events = deque.take(batchLimits.cap)
}
return events
}
Expand Down Expand Up @@ -280,7 +280,7 @@ internal class PostHogQueue(
config.logger.log("Flushed ${events.size} events successfully.")
}
} catch (e: PostHogApiError) {
deleteFiles = deleteFilesIfAPIError(e, config)
deleteFiles = deleteFilesIfAPIError(e, batchLimits, events.size, config.logger)

// only re-throw if retriable (files kept), so executeWithRetry
// can track retryCount and apply backoff
Expand Down Expand Up @@ -463,35 +463,64 @@ internal class PostHogQueue(
}
return tempFiles
}

val currentBatchCapForTesting: Int
@PostHogVisibleForTesting
get() = batchLimits.cap

val currentFlushAtForTesting: Int
@PostHogVisibleForTesting
get() = batchLimits.flushAt
}

private fun calcFloor(currentValue: Int): Int {
return max(currentValue.floorDiv(2), 1)
internal class BatchLimits(
var cap: Int,
var flushAt: Int,
) {
fun halve(actualBatchSize: Int) {
cap =
minOf(cap, actualBatchSize)
.div(2)
.coerceAtLeast(1)

// keep flushAt <= cap so we don't pile up events larger than a single batch
flushAt =
(flushAt / 2)
.coerceAtMost(cap)
.coerceAtLeast(1)
}
Comment thread
turnipdabeets marked this conversation as resolved.
}

internal fun initialBatchLimits(config: PostHogConfig) =
BatchLimits(
cap = config.maxBatchSize.coerceAtLeast(1),
flushAt = config.flushAt.coerceAtLeast(1),
)

internal fun deleteFilesIfAPIError(
e: PostHogApiError,
config: PostHogConfig,
batchLimits: BatchLimits,
actualBatchSize: Int,
logger: PostHogLogger,
): Boolean {
if (e.statusCode < 400) {
config.logger.log("Flushing failed with ${e.statusCode}, let's try again soon.")
logger.log("Flushing failed with ${e.statusCode}, let's try again soon.")

return false
}
// workaround due to png images exceed our max. limit in kafka
if (e.statusCode == 413 && config.maxBatchSize > 1) {
if (e.statusCode == 413 && batchLimits.cap > 1) {
// try to reduce the batch size and flushAt until its 1
// and if it still throws 413 in the next retry, delete the files since we cannot handle anyway
config.maxBatchSize = calcFloor(config.maxBatchSize)
config.flushAt = calcFloor(config.flushAt)
batchLimits.halve(actualBatchSize)

config.logger.log("Flushing failed with ${e.statusCode}, let's try again with a smaller batch.")
logger.log("Flushing failed with ${e.statusCode}, let's try again with a smaller batch.")

return false
}
// Transient server errors and rate limiting, retry
if (e.statusCode in RETRYABLE_STATUS_CODES) {
config.logger.log("Flushing failed with ${e.statusCode}, let's try again soon.")
logger.log("Flushing failed with ${e.statusCode}, let's try again soon.")

return false
}
Expand Down
108 changes: 96 additions & 12 deletions posthog/src/test/java/com/posthog/internal/PostHogQueueTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,87 @@ internal class PostHogQueueTest {
fun `reduces batch size if 413`() {
val e = PostHogApiError(413, "", null)
val config = PostHogConfig(API_KEY)
val limits = initialBatchLimits(config)

assertFalse(deleteFilesIfAPIError(e, config))
assertEquals(config.maxBatchSize, 25) // default 50
assertEquals(config.flushAt, 10) // default 20
assertFalse(deleteFilesIfAPIError(e, limits, actualBatchSize = limits.cap, logger = config.logger))
assertEquals(limits.cap, 25) // default 50
assertEquals(limits.flushAt, 10) // default 20
assertEquals(config.maxBatchSize, 50) // unchanged
assertEquals(config.flushAt, 20) // unchanged
}

@Test
fun `halves cap from actual batch size when smaller than configured cap`() {
val e = PostHogApiError(413, "", null)
val config = PostHogConfig(API_KEY)
val limits = initialBatchLimits(config) // cap = 50

assertFalse(deleteFilesIfAPIError(e, limits, actualBatchSize = 10, logger = config.logger))
assertEquals(limits.cap, 5) // halved from min(50, 10) = 10, not from 50
}

@Test
fun `clamps flushAt to cap after halving so we don't queue more than a batch`() {
// 413 on a tiny batch shrinks cap aggressively while flushAt would only halve.
// Without clamping, flushAt could exceed cap and we'd buffer more events than
// we can ever send in a single batch.
val e = PostHogApiError(413, "", null)
val config =
PostHogConfig(API_KEY).apply {
maxBatchSize = 50
flushAt = 20
}
val limits = initialBatchLimits(config)

assertFalse(deleteFilesIfAPIError(e, limits, actualBatchSize = 2, logger = config.logger))
assertEquals(limits.cap, 1) // min(50, 2) / 2 = 1
assertEquals(limits.flushAt, 1) // would be 10 without the clamp
}

@Test
fun `halves cap repeatedly across multiple 413s through the queue flush flow`() {
// End-to-end: each successive flush() against a 413 should observe a smaller
// cap on the next attempt, proving takeFiles() reads batchLimits.cap and
// not config.maxBatchSize.
val http = mockHttp(total = 2, response = MockResponse().setResponseCode(413).setBody(""))
val url = http.url("/")

val fakeCurrentTime = FakePostHogDateProvider()
// pause time pinned to the past so 413's calculated backoff never blocks
fakeCurrentTime.setAddSecondsToCurrentDate(parseISO8601Date("1970-09-20T11:58:49.000Z")!!)

// flushAt high so add() doesn't auto-flush — drive flushes manually
val sut =
getSut(
host = url.toString(),
flushAt = 100,
dateProvider = fakeCurrentTime,
maxBatchSize = 4,
)

for (i in 0 until 4) {
sut.add(generateEvent("event$i", givenUuuid = UUID.randomUUID()))
}
executor.awaitExecution()
assertEquals(4, sut.dequeList.size)
assertEquals(4, sut.currentBatchCapForTesting)

// First flush: batch=4 → 413 → cap halves to 2, batch retained.
sut.flush()
executor.awaitExecution()
assertEquals(2, sut.currentBatchCapForTesting)
assertEquals(2, sut.currentFlushAtForTesting)
assertEquals(4, sut.dequeList.size)

// Second flush: batch=2 (using the new, smaller cap) → 413 → cap halves to 1.
sut.flush()
executor.awaitExecution()
assertEquals(1, sut.currentBatchCapForTesting)
assertEquals(1, sut.currentFlushAtForTesting)
assertEquals(4, sut.dequeList.size)

sut.clear()
executor.shutdownAndAwaitTermination()
}

@Test
Expand All @@ -408,58 +485,65 @@ internal class PostHogQueueTest {
maxBatchSize = 1
flushAt = 1
}
val limits = initialBatchLimits(config)

assertTrue(deleteFilesIfAPIError(e, config))
assertEquals(config.maxBatchSize, 1)
assertEquals(config.flushAt, 1)
assertTrue(deleteFilesIfAPIError(e, limits, actualBatchSize = 1, logger = config.logger))
assertEquals(limits.cap, 1)
assertEquals(limits.flushAt, 1)
}

@Test
fun `delete files if errored`() {
val e = PostHogApiError(400, "", null)
val config = PostHogConfig(API_KEY)
val limits = initialBatchLimits(config)

assertTrue(deleteFilesIfAPIError(e, config))
assertTrue(deleteFilesIfAPIError(e, limits, actualBatchSize = limits.cap, logger = config.logger))
}

@Test
fun `retries on 500`() {
val e = PostHogApiError(500, "", null)
val config = PostHogConfig(API_KEY)
val limits = initialBatchLimits(config)

assertFalse(deleteFilesIfAPIError(e, config))
assertFalse(deleteFilesIfAPIError(e, limits, actualBatchSize = limits.cap, logger = config.logger))
}

@Test
fun `retries on 502`() {
val e = PostHogApiError(502, "", null)
val config = PostHogConfig(API_KEY)
val limits = initialBatchLimits(config)

assertFalse(deleteFilesIfAPIError(e, config))
assertFalse(deleteFilesIfAPIError(e, limits, actualBatchSize = limits.cap, logger = config.logger))
}

@Test
fun `retries on 429`() {
val e = PostHogApiError(429, "", null)
val config = PostHogConfig(API_KEY)
val limits = initialBatchLimits(config)

assertFalse(deleteFilesIfAPIError(e, config))
assertFalse(deleteFilesIfAPIError(e, limits, actualBatchSize = limits.cap, logger = config.logger))
}

@Test
fun `retries on 504`() {
val e = PostHogApiError(504, "", null)
val config = PostHogConfig(API_KEY)
val limits = initialBatchLimits(config)

assertFalse(deleteFilesIfAPIError(e, config))
assertFalse(deleteFilesIfAPIError(e, limits, actualBatchSize = limits.cap, logger = config.logger))
}

@Test
fun `retries on 503`() {
val e = PostHogApiError(503, "", null)
val config = PostHogConfig(API_KEY)
val limits = initialBatchLimits(config)

assertFalse(deleteFilesIfAPIError(e, config))
assertFalse(deleteFilesIfAPIError(e, limits, actualBatchSize = limits.cap, logger = config.logger))
}

@Test
Expand Down
Loading