Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f4248de
Rename remotequeries fx package
nubtron May 21, 2026
e274c20
Generalize remote query bridge integration selector
nubtron May 21, 2026
3956a2b
Keep integration selector out of Postgres executor payload
nubtron May 21, 2026
8000ca0
Remove redundant remote query credential checks
nubtron May 21, 2026
e978b84
Use strict JSON decoding for remote queries
nubtron May 21, 2026
bed8a6f
Remove Postgres remote query wrapper
nubtron May 21, 2026
2839cb0
Remove hardcoded Postgres remote query bridge
nubtron May 21, 2026
972934a
Fix rtloader test path precedence
nubtron May 21, 2026
aadc440
Add local Remote Queries PAR action proof
nubtron May 21, 2026
239e718
Add local Remote Queries live PAR proof
nubtron May 21, 2026
9f6f700
Use binary-safe Remote Queries stream events
nubtron May 21, 2026
36decf0
Add fused local Remote Queries PAR proof
nubtron May 21, 2026
da23393
Make fused proof Postgres fixture reusable
nubtron May 21, 2026
5d5306c
Remove inline Remote Queries execution path
nubtron May 21, 2026
a2f9d8a
Coalesce Remote Queries stream events for IPC
nubtron May 21, 2026
4e8c4b9
Add Remote Queries stream timing breakdown
nubtron May 21, 2026
977d61f
Add Remote Queries AgentSecure RPC bridge
nubtron May 21, 2026
3b530eb
Add standalone Remote Queries PAR proof
nubtron May 21, 2026
83f739b
Allow Remote Queries proof chunk size override
nubtron May 21, 2026
3b4a1ce
Use Postgres integration fixture in Remote Queries proof
nubtron May 21, 2026
f771381
Use practical Remote Queries proof chunk size
nubtron May 21, 2026
c4d9e5a
Clarify Remote Queries byte shift sizes
nubtron May 21, 2026
cb705bc
Query fixture table in Remote Queries proof
nubtron May 21, 2026
686602a
Add large Remote Queries payload proof cases
nubtron May 21, 2026
0081379
Fix Remote Queries streaming proof assertions
nubtron May 21, 2026
c04ad46
Add remote query Postgres match endpoint
nubtron May 21, 2026
6d5bc7f
Stream Remote Queries proof payloads to PAR
nubtron May 21, 2026
66e2a12
Stream Remote Queries COPY results through Agent
nubtron May 21, 2026
f5cdad5
Add remote query Postgres execute bridge
nubtron May 21, 2026
a3412f5
Add PAR-shaped remote query execute IPC proof
nubtron May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

doqueryactionsfx "github.com/DataDog/datadog-agent/comp/dataobs/queryactions/fx"
haagentfx "github.com/DataDog/datadog-agent/comp/haagent/fx"
remotequeriesfx "github.com/DataDog/datadog-agent/comp/remotequeries/fx"
snmpscanfx "github.com/DataDog/datadog-agent/comp/snmpscan/fx"
snmpscanmanagerfx "github.com/DataDog/datadog-agent/comp/snmpscanmanager/fx"
"github.com/DataDog/datadog-agent/pkg/aggregator"
Expand Down Expand Up @@ -567,6 +568,7 @@ func getSharedFxOption() fx.Option {
remoteagentregistryfx.Module(),
haagentfx.Module(),
doqueryactionsfx.Module(),
remotequeriesfx.Module(),
metricscompressorfx.Module(),
diagnosefx.Module(),
ipcfx.ModuleReadWrite(),
Expand Down
5 changes: 5 additions & 0 deletions comp/api/grpcserver/impl-agent/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
dogstatsdServer "github.com/DataDog/datadog-agent/comp/dogstatsd/server"
rcservice "github.com/DataDog/datadog-agent/comp/remote-config/rcservice/def"
rcservicemrf "github.com/DataDog/datadog-agent/comp/remote-config/rcservicemrf/def"
remotequeriesimpl "github.com/DataDog/datadog-agent/comp/remotequeries/impl"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
grpcutil "github.com/DataDog/datadog-agent/pkg/util/grpc"
"github.com/DataDog/datadog-agent/pkg/util/option"
Expand Down Expand Up @@ -81,6 +82,7 @@ type server struct {
telemetry telemetry.Component
hostname hostnameinterface.Component
configStream configstream.Component
remoteQueries *remotequeriesimpl.RemoteQueryExecuteService
}

func (s *server) BuildServer() http.Handler {
Expand Down Expand Up @@ -122,6 +124,7 @@ func (s *server) BuildServer() http.Handler {
autodiscovery: s.autodiscovery,
configComp: s.configComp,
configStreamServer: configstreamServer.NewServer(s.configComp, s.configStream, s.remoteAgentRegistry),
remoteQueries: s.remoteQueries,
})

return grpcServer
Expand All @@ -134,6 +137,7 @@ type Provides struct {

// NewComponent creates a new grpc component
func NewComponent(reqs Requires) (Provides, error) {
collector, _ := reqs.Collector.Get()
provides := Provides{
Comp: &server{
IPC: reqs.IPC,
Expand All @@ -152,6 +156,7 @@ func NewComponent(reqs Requires) (Provides, error) {
telemetry: reqs.Telemetry,
hostname: reqs.Hostname,
configStream: reqs.ConfigStream,
remoteQueries: remotequeriesimpl.NewRemoteQueryExecuteService(collector, reqs.Cfg.GetBool(remotequeriesimpl.RemoteQueriesExecuteEnabledConfig)),
},
}
return provides, nil
Expand Down
125 changes: 125 additions & 0 deletions comp/api/grpcserver/impl-agent/remote_query_execute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-present Datadog, Inc.

package agentimpl

import (
"context"
"testing"

"google.golang.org/grpc"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/pkg/collector/check"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
)

func TestRemoteQueryExecuteResponseFromJSONMapsStructuredRows(t *testing.T) {
resp, err := remoteQueryExecuteResponseFromJSON(`{"status":"SUCCEEDED","columns":[{"name":"value","type":"integer"}],"rows":[{"value":1}],"stats":{"elapsed_ms":2},"truncated":true}`)

require.NoError(t, err)
assert.Equal(t, "SUCCEEDED", resp.GetStatus())
require.Len(t, resp.GetColumns(), 1)
assert.Equal(t, "value", resp.GetColumns()[0].AsMap()["name"])
require.Len(t, resp.GetRows(), 1)
assert.Equal(t, float64(1), resp.GetRows()[0].AsMap()["value"])
assert.True(t, resp.GetTruncated())
assert.Equal(t, float64(2), resp.GetStats().AsMap()["elapsed_ms"])
}

func TestRemoteQueryExecuteRejectsUnaryInlineMode(t *testing.T) {
resp, err := (&serverSecure{}).RemoteQueryExecute(context.Background(), &pb.RemoteQueryExecuteRequest{})

require.NoError(t, err)
assert.Equal(t, "invalid_request", resp.GetStatus())
require.NotNil(t, resp.GetError())
assert.Equal(t, "invalid_request", resp.GetError().GetCode())
assert.Contains(t, resp.GetError().GetMessage(), "RemoteQueryExecuteStream")
}

func TestRemoteQueryExecuteStreamReturnsSanitizedUnavailableWhenServiceMissing(t *testing.T) {
stream := &captureRemoteQueryExecuteStreamServer{}
err := (&serverSecure{}).RemoteQueryExecuteStream(&pb.RemoteQueryExecuteRequest{}, stream)

require.NoError(t, err)
require.Len(t, stream.chunks, 2)
assert.Equal(t, "executor_unavailable", stream.chunks[0].GetEvent().GetError().GetCode())
assert.True(t, stream.chunks[1].GetFinal())
}

func TestRemoteQueryExecuteRequestFromProtoPreservesCopyStream(t *testing.T) {
req, err := remoteQueryExecuteRequestFromProto(&pb.RemoteQueryExecuteRequest{
Integration: "postgres",
Operation: "copy_stream",
Format: "csv",
Target: &pb.RemoteQueryTarget{Host: "LOCALHOST.", Port: 5432, Dbname: "postgres"},
Query: "SELECT city, country FROM cities ORDER BY city",
CopyLimits: &pb.RemoteQueryExecuteCopyLimits{ChunkBytes: 32, MaxBytes: 1024, MaxRowBytes: 1024, TimeoutMs: 1000},
})

require.NoError(t, err)
assert.Equal(t, "postgres", req.Integration)
assert.Equal(t, "copy_stream", req.Operation)
assert.Equal(t, "csv", req.Format)
require.NotNil(t, req.CopyLimits)
assert.Equal(t, 32, req.CopyLimits.ChunkBytes)
assert.Nil(t, req.Limits)
}

func TestRemoteQueryIPCStreamCoalescerFlushesDataAtFourMiB(t *testing.T) {
stream := &captureRemoteQueryExecuteStreamServer{}
coalescer := newRemoteQueryIPCStreamCoalescer(stream)
const (
threeMiB = 3 << 20 // 3 MiB.
twoMiB = 2 << 20 // 2 MiB.
fiveMiB = 5 << 20 // 5 MiB.
)

require.NoError(t, coalescer.Send(check.RemoteQueryStreamEvent{Type: "metadata", MetadataJSON: `{"operation":"copy_stream","format":"csv"}`}))
require.NoError(t, coalescer.Send(check.RemoteQueryStreamEvent{Type: "data", MetadataJSON: `{"sequence":1,"offset":0,"bytes":3145728}`, Payload: make([]byte, threeMiB)}))
assert.Len(t, stream.chunks, 1, "data below 4MiB should be coalesced before secure IPC send")
require.NoError(t, coalescer.Send(check.RemoteQueryStreamEvent{Type: "data", MetadataJSON: `{"sequence":2,"offset":3145728,"bytes":2097152}`, Payload: make([]byte, twoMiB)}))
require.Len(t, stream.chunks, 2, "crossing 4MiB should flush one coalesced data event")
firstData := stream.chunks[1].GetEvent().GetData()
require.NotNil(t, firstData)
assert.Equal(t, uint64(0), firstData.GetOffset())
assert.Equal(t, uint64(remoteQuerySecureIPCDataFlushBytes), firstData.GetBytes())
assert.Len(t, firstData.GetPayload(), remoteQuerySecureIPCDataFlushBytes)

require.NoError(t, coalescer.Flush())
require.Len(t, stream.chunks, 3)
secondData := stream.chunks[2].GetEvent().GetData()
require.NotNil(t, secondData)
assert.Equal(t, uint64(remoteQuerySecureIPCDataFlushBytes), secondData.GetOffset())
assert.Equal(t, uint64(fiveMiB-remoteQuerySecureIPCDataFlushBytes), secondData.GetBytes())
assert.Len(t, secondData.GetPayload(), fiveMiB-remoteQuerySecureIPCDataFlushBytes)
}

func TestRemoteQueryStreamEventFromCheckEventPreservesBinaryPayload(t *testing.T) {
event, err := remoteQueryStreamEventFromCheckEvent(check.RemoteQueryStreamEvent{
Type: "data",
MetadataJSON: `{"sequence":7,"offset":11,"bytes":3}`,
Payload: []byte{0x00, 0xff, 0x80},
})

require.NoError(t, err)
assert.Equal(t, uint64(7), event.GetSequence())
require.NotNil(t, event.GetData())
assert.Equal(t, []byte{0x00, 0xff, 0x80}, event.GetData().GetPayload())
assert.Equal(t, uint64(11), event.GetData().GetOffset())
assert.Equal(t, uint64(3), event.GetData().GetBytes())
}

type captureRemoteQueryExecuteStreamServer struct {
grpc.ServerStream
chunks []*pb.RemoteQueryExecuteChunk
}

func (s *captureRemoteQueryExecuteStreamServer) Send(chunk *pb.RemoteQueryExecuteChunk) error {
s.chunks = append(s.chunks, chunk)
return nil
}
Loading
Loading