feat(profiling): Add pipeline workflow to perfetto profiling #5932
feat(profiling): Add pipeline workflow to perfetto profiling #5932markushi wants to merge 2 commits intofeat/markushi/perfetto-profiling-supportfrom
Conversation
Dav1dde
left a comment
There was a problem hiding this comment.
Thanks this addresses my biggest concern of the other PR.
I think there may still be some stricter typing we can do, and maybe de-duplicate some of the filtering logic, but this goes into the territory of maybe not worth it at this time. I tried some stuff locally and realized it'll need more changes quickly.
So ended up just leaving some nits.
For order of PRs, happy to merge them separately into master, as each one of the PRs is functional standalone.
Should also give other reviewers some time to take a look!
| } | ||
| } | ||
|
|
||
| #[derive(Clone, Debug, Serialize)] |
There was a problem hiding this comment.
Looks like this is only Serialize for a test.
There was a problem hiding this comment.
It's also used for the kafka message. But granted, it feels a bit hacky, as it also relies on flattening to be enabled in order to produce top level kafka message attributes. Happy to decouple this.
There was a problem hiding this comment.
Since it's only two fields, I would prefer the decoupling and specify the kafka message separately.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 4108a6e. Configure here.
| } | ||
| q | ||
| } | ||
| } |
There was a problem hiding this comment.
Non-aggregated quantities may cause incorrect rate limiting
Medium Severity
ExpandedProfileChunks::quantities() uses q.extend() which appends without merging duplicate categories (e.g., three backend chunks produce [(ProfileChunk, 1), (ProfileChunk, 1), (ProfileChunk, 1)]). The equivalent SerializedProfileChunks::quantities() aggregates into [(ProfileChunk, 3)]. The CountRateLimited enforce loop calls try_consume once per entry — with Redis rate limiting, calling try_consume(1) three times vs try_consume(3) once yields different results since quota is actually consumed on each call. The [T]::quantities() implementation in counted.rs uses a BTreeMap for proper aggregation and could be followed here.
Reviewed by Cursor Bugbot for commit 4108a6e. Configure here.
There was a problem hiding this comment.
@Dav1dde out of scope for this PR, but what if we redefine Quantities as a BTreeMap of category -> usize? I encountered this question a lot when refactoring the transaction processor. That is, who does the deduplication, the caller of quantities or quantities itself?
There was a problem hiding this comment.
We should use an enumap!
I personally like the small vec over all (usability and that it is actually small and on the stack), there is also no need to de-duplicate as long is it is somewhat bounded. Then again a *Map feels slightly more appropriate.
There was a problem hiding this comment.
I feel like we should make it a newtype, give it a nice API and then the backing thing can be whatever is easiest (probably a BTreeMap). Seems like a good improvement!
| } else { | ||
| item.set_payload(ContentType::Json, chunk.payload); | ||
| } | ||
| item |
There was a problem hiding this comment.
Reconstructed items missing platform header in serialization
Low Severity
When Expanded profile chunks are serialized back to an envelope in serialize_envelope, the reconstructed Item never has its platform set. The original items had a platform header (now required for perfetto items), but it's lost during expansion. If this path is ever used for forwarding, the downstream relay would reject perfetto items because profile_type() returns None — which this PR explicitly makes an error condition.
Reviewed by Cursor Bugbot for commit 4108a6e. Configure here.
| } | ||
| } | ||
|
|
||
| #[derive(Clone, Debug, Serialize)] |
There was a problem hiding this comment.
Since it's only two fields, I would prefer the decoupling and specify the kafka message separately.
| } | ||
| q | ||
| } | ||
| } |
There was a problem hiding this comment.
@Dav1dde out of scope for this PR, but what if we redefine Quantities as a BTreeMap of category -> usize? I encountered this question a lot when refactoring the transaction processor. That is, who does the deduplication, the caller of quantities or quantities itself?
| let items = e | ||
| .chunks | ||
| .into_iter() | ||
| .map(|chunk| { | ||
| let mut item = Item::new(ItemType::ProfileChunk); | ||
| if let Some(raw_profile) = chunk.raw_profile { | ||
| let meta_length = chunk.payload.len() as u32; | ||
| let mut compound = bytes::BytesMut::with_capacity( | ||
| chunk.payload.len() + raw_profile.payload.len(), | ||
| ); | ||
| compound.extend_from_slice(&chunk.payload); | ||
| compound.extend_from_slice(&raw_profile.payload); | ||
| item.set_payload(raw_profile.content_type, compound.freeze()); | ||
| item.set_meta_length(meta_length); | ||
| } else { | ||
| item.set_payload(ContentType::Json, chunk.payload); | ||
| } | ||
| item | ||
| }) | ||
| .collect(); | ||
| Envelope::from_parts(e.headers, items) | ||
| })), |
There was a problem hiding this comment.
Wouldn't it be a bug to call serialize_envelope on Self::Expanded? Expanded is only for processing relays, which call forward_store, so it seems like we could do the reverse of the below, raise an internal_error in this case?
| }), | ||
| quantities, | ||
| }) | ||
| } else { |
There was a problem hiding this comment.
nit: This if-else expression is long enough to put the two branches in separate functions IMO.


As a follow up to:
https://github.com/getsentry/relay/pull/5659/changes/BASE..db555e68ad45debd66f46d28e84aa6952b7498b7#r3167376271, introduces a pipeline workflow instead of doing everything in one place.