Refactor prefetcher code#818
Conversation
This seems like unnecessary coupling between the behaviours of the "cache" and "fetcher", just the kind of thing you were wishing to avoid. This way around, if the cache is None (which is recommended), you may end up waiting, no? |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #818 +/- ##
==========================================
- Coverage 88.35% 88.29% -0.06%
==========================================
Files 15 15
Lines 2989 3051 +62
==========================================
+ Hits 2641 2694 +53
- Misses 348 357 +9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Yes Ideally, one shouldn't layer the existing caches on top of the
This is correct, if While I agree that starting prefetching on the second read is generally more logical, we have to consider existing users who might enable this experimental flag while still using current caches. We haven't transitioned the default cache setting to none yet because removing buffering entirely could negatively impact performance for large portion of our user base who do not use this experimental flag. |
|
What happens if read-ahead runs out of cache? I think it will trigger the exact same false-positive signals again? |
Yes, so basically in I'm not concerned about handling false-positive here, I'm only concerned about the prefetching shouldn't happen at the first read, because then it would destroy the throughput of users doing completely random reads. If the goal is to fix this false positive, then this variable |
|
@martindurant can you review this? |
| if not available: | ||
| if self.is_producer_stopped() and self.queue.empty(): | ||
| is_producer_stopped = ( | ||
| not hasattr(self.orchestrator, "producer") |
There was a problem hiding this comment.
nit: how about setting the producer to None then we do self.orchestrator.producer is None check here? It's more readable.
There was a problem hiding this comment.
(or set it to None as a class attribute)
|
@martindurant, can you take a look if you're interested? The benchmarks before and after this change is attached in the description. |
|
The change should affect the small reads primarily? |
|
(sorry no, that comment is for #840) |
Yes, it should impact small reads primarily. |
| bp._fetch(100, 150) | ||
| # Do 6 reads to push the streak well past the MIN_STREAKS threshold | ||
| for i in range(6): | ||
| bp._fetch(i * 50, (i + 1) * 50) |
There was a problem hiding this comment.
It feels like the number here are somewhat arbitrary - or at least, they depend on the current set of defaults assumed by the prefetcher. Maybe they should be explicitly derived from those values?
I don't mind if not, but do add a comment, because they will need to be updated should the defaults change.
There was a problem hiding this comment.
Yeah they're hardcoded, and i think the best practice is to hardcode the test, and not derive from main code. Added the comment to update as the parent value changes.
|
|
||
| assert bp._fetch(0, 100) == b"A" * 100 | ||
| for i in range(2): | ||
| bp._fetch(i * 100, (i + 1) * 100) |
There was a problem hiding this comment.
Before, the read_tracker was directly mutated. Why was that insufficient?
There was a problem hiding this comment.
I didn't get this, can you detail this comment more?
| for i in range(4): | ||
| bp._fetch(i * 60, (i + 1) * 60) | ||
|
|
||
| fsspec.asyn.sync(bp.loop, asyncio.sleep, 0.1) |
There was a problem hiding this comment.
The sleep may need to be bigger in CI machines - can there be some form of wait here?
There was a problem hiding this comment.
I think, given this is just a test, this should be fine. I've tested it on smaller machine (4 cores) as well as bigger machine (192 cores). This test primarily checks whether prefetching remains disable if average is greater than user specified maximum size. a 0.1s gives event loop enough space to schedule things up if it is required.
| first_val = self._history[0] | ||
| return any(val != first_val for val in self._history) |
There was a problem hiding this comment.
| first_val = self._history[0] | |
| return any(val != first_val for val in self._history) | |
| return len(set(self._history)) > 1 |
?
Whether this is faster probably depends on where the first non-equal value is and how long the list can be.
There was a problem hiding this comment.
I think any(...) is better, it would exit early as soon as it finds the first non equal value, additionally it doesn't need to create any extra space for set.
There was a problem hiding this comment.
My testing suggests set is always faster if the history isn't big (which it never is!) or the non-equal value is not at the start. The iterator and set of hashes probably take us similar, negligible space.
| # remains the network I/O. However, for massive reads (>= 64MB), the extra | ||
| # step of copying and assembling huge byte strings in memory severely slows | ||
| # down the operation. | ||
| VARIABLE_IO_THRESHOLD = 64 * 1024 * 1024 |
There was a problem hiding this comment.
Does the best value of this depend on the network bandwidth? I bet on slow connections, we always prefer any amount of prefetching and copy time is irrelevant.
There was a problem hiding this comment.
Yes, this is true, for slow connections it's always best to prefetch any amount. I'm just playing safe here by putting a lower value, and setting this value based on the numbers derived from fastest network. Do you think we should adjust this variable based on bucket type?
There was a problem hiding this comment.
In general, we should be able to establish how close to the data we are running and make sensible decisions based on it. That would make nice follow-up work.
| if not available: | ||
| if self.is_producer_stopped() and self.queue.empty(): | ||
| is_producer_stopped = ( | ||
| not hasattr(self.orchestrator, "producer") |
There was a problem hiding this comment.
(or set it to None as a class attribute)
|
@martindurant, Happy Monday! Sorry for the delay, I got tied up with a few other things and couldn't follow up on this PR sooner. I've addressed your comments and included one additional small fix. (I kept them in separate commits to make reviewing easier.) Regarding the small fix: Previously, if the background prefetcher entered an error state, it would permanently reject incoming read requests. This is problematic for users who want to catch the exception and retry on their end, because any subsequent retries would just return the cached exception. The latest commit solves this by triggering a hard seek to the requested offset. The hard seek takes care of clearing the queue, resetting the internal prefetch state, and starting fresh with a new network request. Let me know what you think! |
| assert bp._fetch(0, 10) == b"X" * 10 | ||
| assert bp._error is None | ||
|
|
||
| bp.close() |
There was a problem hiding this comment.
Any problem if close() is not reached due to an error in the test? Should this be a fixture?
| first_val = self._history[0] | ||
| return any(val != first_val for val in self._history) |
There was a problem hiding this comment.
My testing suggests set is always faster if the history isn't big (which it never is!) or the non-equal value is not at the start. The iterator and set of hashes probably take us similar, negligible space.
| # remains the network I/O. However, for massive reads (>= 64MB), the extra | ||
| # step of copying and assembling huge byte strings in memory severely slows | ||
| # down the operation. | ||
| VARIABLE_IO_THRESHOLD = 64 * 1024 * 1024 |
There was a problem hiding this comment.
In general, we should be able to establish how close to the data we are running and make sensible decisions based on it. That would make nice follow-up work.
| ) | ||
|
|
||
| # Disable prefetching ahead if variable AND average > 64MB, or if it exceeds user max | ||
| if ( |
There was a problem hiding this comment.
This method got very long with all these cases, maybe should split the "choices" and the "action" parts.
| logger.debug("Producer reached EOF. Exiting background loop.") | ||
| self.is_stopped = True | ||
| break | ||
| except asyncio.CancelledError: |
There was a problem hiding this comment.
This big indent feels like a context?
| break | ||
| except asyncio.CancelledError: | ||
| logger.debug("PrefetchProducer loop was cancelled.") | ||
| pass |
| is_variable = self.tracker.is_variable | ||
| avg_io_size = self.tracker.average |
There was a problem hiding this comment.
May as well inline these in the calculations below, since they are already multiline. We don't use the variables again.
Summary of the changes:
Refactor Prefetcher file:
PrefetchProducerandPrefetchConsumerwith direct object references. This streamlines the initialization process and makes the internal dependency graph much cleaner.Start prefetching on 3rd read, instead of second
readahead_chunked(which generates two underlying requests for a single user read), this triggered false positive signals, causing the engine to prefetch unnecessary extra data. Delaying the start until the third read corrects this behavior.Disable Prefetching for variable reads with average greater than 64MB
When read chunk sizes are highly variable, the producer fetches data based on a rolling average, forcing the consumer to slice and stitch multiple partial chunks to match the exact user request.
Numbers before Change
Before
Numbers After Change
After