Skip to content

Commit 7a20ebb

Browse files
committed
addressing review comments
1 parent 901e64e commit 7a20ebb

5 files changed

Lines changed: 167 additions & 51 deletions

File tree

server/internal/orchestrator/swarm/orchestrator.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,12 @@ func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInsta
594594
// instance. RAG only requires read access, so a single ServiceUserRoleRO is
595595
// created per database node using the same canonical+per-node pattern as MCP.
596596
func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
597+
// Parse the RAG service config to extract API keys.
598+
ragConfig, errs := database.ParseRAGServiceConfig(spec.ServiceSpec.Config, false)
599+
if len(errs) > 0 {
600+
return nil, fmt.Errorf("failed to parse RAG service config: %w", errors.Join(errs...))
601+
}
602+
597603
canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO)
598604

599605
// Canonical read-only role — runs on the node co-located with this instance.
@@ -623,6 +629,24 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan
623629
})
624630
}
625631

632+
// Service data directory resource (host-side bind mount directory).
633+
dataDirID := spec.ServiceInstanceID + "-data"
634+
dataDir := &filesystem.DirResource{
635+
ID: dataDirID,
636+
HostID: spec.HostID,
637+
Path: filepath.Join(o.cfg.DataDir, "services", spec.ServiceInstanceID),
638+
}
639+
640+
// API key files resource — writes provider keys into a "keys" subdirectory.
641+
keysResource := &RAGServiceKeysResource{
642+
ServiceInstanceID: spec.ServiceInstanceID,
643+
HostID: spec.HostID,
644+
ParentID: dataDirID,
645+
Keys: extractRAGAPIKeys(ragConfig),
646+
}
647+
648+
orchestratorResources = append(orchestratorResources, dataDir, keysResource)
649+
626650
return o.buildServiceInstanceResources(spec, orchestratorResources)
627651
}
628652

server/internal/orchestrator/swarm/rag_service_keys_resource.go

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strings"
99

1010
"github.com/pgEdge/control-plane/server/internal/database"
11+
"github.com/pgEdge/control-plane/server/internal/filesystem"
1112
"github.com/pgEdge/control-plane/server/internal/resource"
1213
)
1314

@@ -23,23 +24,22 @@ func RAGServiceKeysResourceIdentifier(serviceInstanceID string) resource.Identif
2324
}
2425

2526
// RAGServiceKeysResource manages provider API key files on the host filesystem.
26-
// Keys are written to KeysDir and bind-mounted read-only into the RAG container.
27+
// Keys are written to a "keys" subdirectory under the service data directory
28+
// and bind-mounted read-only into the RAG container.
2729
// The directory and all files are removed when the service is deleted.
2830
type RAGServiceKeysResource struct {
2931
ServiceInstanceID string `json:"service_instance_id"`
3032
HostID string `json:"host_id"`
31-
KeysDir string `json:"keys_dir"` // absolute path on host
32-
Keys map[string]string `json:"keys"` // filename → key value
33+
ParentID string `json:"parent_id"` // DirResource ID for the service data directory
34+
Keys map[string]string `json:"keys"` // filename → key value
3335
}
3436

3537
func (r *RAGServiceKeysResource) ResourceVersion() string {
3638
return "1"
3739
}
3840

3941
func (r *RAGServiceKeysResource) DiffIgnore() []string {
40-
return []string{
41-
"/keys_dir",
42-
}
42+
return nil
4343
}
4444

4545
func (r *RAGServiceKeysResource) Identifier() resource.Identifier {
@@ -51,19 +51,30 @@ func (r *RAGServiceKeysResource) Executor() resource.Executor {
5151
}
5252

5353
func (r *RAGServiceKeysResource) Dependencies() []resource.Identifier {
54-
return nil
54+
return []resource.Identifier{
55+
filesystem.DirResourceIdentifier(r.ParentID),
56+
}
5557
}
5658

5759
func (r *RAGServiceKeysResource) TypeDependencies() []resource.Type {
5860
return nil
5961
}
6062

63+
func (r *RAGServiceKeysResource) keysDir(rc *resource.Context) (string, error) {
64+
parentPath, err := filesystem.DirResourceFullPath(rc, r.ParentID)
65+
if err != nil {
66+
return "", fmt.Errorf("failed to get service data dir path: %w", err)
67+
}
68+
return filepath.Join(parentPath, "keys"), nil
69+
}
70+
6171
func (r *RAGServiceKeysResource) Refresh(ctx context.Context, rc *resource.Context) error {
62-
if r.KeysDir == "" {
72+
keysDir, err := r.keysDir(rc)
73+
if err != nil {
6374
return resource.ErrNotFound
6475
}
6576

66-
info, err := os.Stat(r.KeysDir)
77+
info, err := os.Stat(keysDir)
6778
if os.IsNotExist(err) {
6879
return resource.ErrNotFound
6980
}
@@ -75,7 +86,7 @@ func (r *RAGServiceKeysResource) Refresh(ctx context.Context, rc *resource.Conte
7586
}
7687

7788
for name := range r.Keys {
78-
if _, err := os.Stat(filepath.Join(r.KeysDir, name)); err != nil {
89+
if _, err := os.Stat(filepath.Join(keysDir, name)); err != nil {
7990
if os.IsNotExist(err) {
8091
return resource.ErrNotFound
8192
}
@@ -87,40 +98,77 @@ func (r *RAGServiceKeysResource) Refresh(ctx context.Context, rc *resource.Conte
8798
}
8899

89100
func (r *RAGServiceKeysResource) Create(ctx context.Context, rc *resource.Context) error {
90-
if err := os.MkdirAll(r.KeysDir, 0o700); err != nil {
101+
keysDir, err := r.keysDir(rc)
102+
if err != nil {
103+
return err
104+
}
105+
if err := os.MkdirAll(keysDir, 0o700); err != nil {
91106
return fmt.Errorf("failed to create keys directory: %w", err)
92107
}
93-
return r.writeKeyFiles()
108+
return r.writeKeyFiles(keysDir)
94109
}
95110

96111
func (r *RAGServiceKeysResource) Update(ctx context.Context, rc *resource.Context) error {
97-
return r.writeKeyFiles()
112+
keysDir, err := r.keysDir(rc)
113+
if err != nil {
114+
return err
115+
}
116+
if err := r.removeStaleKeyFiles(keysDir); err != nil {
117+
return err
118+
}
119+
return r.writeKeyFiles(keysDir)
98120
}
99121

100122
func (r *RAGServiceKeysResource) Delete(ctx context.Context, rc *resource.Context) error {
101-
if r.KeysDir == "" {
123+
keysDir, err := r.keysDir(rc)
124+
if err != nil {
125+
// Parent dir is gone or unresolvable; nothing to clean up.
102126
return nil
103127
}
104-
if err := os.RemoveAll(r.KeysDir); err != nil {
128+
if err := os.RemoveAll(keysDir); err != nil {
105129
return fmt.Errorf("failed to remove keys directory: %w", err)
106130
}
107131
return nil
108132
}
109133

110-
func (r *RAGServiceKeysResource) writeKeyFiles() error {
134+
func (r *RAGServiceKeysResource) writeKeyFiles(keysDir string) error {
111135
for name, key := range r.Keys {
112136
if err := validateKeyFilename(name); err != nil {
113137
return err
114138
}
115-
path := filepath.Join(r.KeysDir, name)
139+
path := filepath.Join(keysDir, name)
116140
if err := os.WriteFile(path, []byte(key), 0o600); err != nil {
117141
return fmt.Errorf("failed to write key file %q: %w", name, err)
118142
}
119143
}
120144
return nil
121145
}
122146

123-
// validateKeyFilename rejects filenames that could escape KeysDir via path traversal.
147+
// removeStaleKeyFiles deletes key files in keysDir that are no longer in r.Keys.
148+
// This handles the case where a pipeline (and its key files) has been removed.
149+
func (r *RAGServiceKeysResource) removeStaleKeyFiles(keysDir string) error {
150+
entries, err := os.ReadDir(keysDir)
151+
if os.IsNotExist(err) {
152+
return nil
153+
}
154+
if err != nil {
155+
return fmt.Errorf("failed to read keys directory: %w", err)
156+
}
157+
for _, entry := range entries {
158+
if entry.IsDir() {
159+
continue
160+
}
161+
if _, ok := r.Keys[entry.Name()]; !ok {
162+
path := filepath.Join(keysDir, entry.Name())
163+
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
164+
return fmt.Errorf("failed to remove stale key file %q: %w", entry.Name(), err)
165+
}
166+
}
167+
}
168+
return nil
169+
}
170+
171+
// validateKeyFilename rejects filenames that could escape the keys directory via path traversal.
124172
func validateKeyFilename(name string) error {
125173
if filepath.Clean(name) != name || filepath.IsAbs(name) || strings.ContainsAny(name, `/\`) {
126174
return fmt.Errorf("invalid key filename %q", name)

server/internal/orchestrator/swarm/rag_service_keys_resource_test.go

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66

77
"github.com/pgEdge/control-plane/server/internal/database"
8+
"github.com/pgEdge/control-plane/server/internal/filesystem"
89
"github.com/pgEdge/control-plane/server/internal/resource"
910
)
1011

@@ -36,27 +37,60 @@ func TestRAGServiceKeysResource_Executor(t *testing.T) {
3637

3738
func TestRAGServiceKeysResource_DiffIgnore(t *testing.T) {
3839
r := &RAGServiceKeysResource{}
39-
ignored := r.DiffIgnore()
40-
if len(ignored) != 1 || ignored[0] != "/keys_dir" {
41-
t.Errorf("DiffIgnore() = %v, want [\"/keys_dir\"]", ignored)
40+
if got := r.DiffIgnore(); len(got) != 0 {
41+
t.Errorf("DiffIgnore() = %v, want empty", got)
4242
}
4343
}
4444

45-
func TestRAGServiceKeysResource_RefreshEmptyKeysDir(t *testing.T) {
45+
func TestRAGServiceKeysResource_Dependencies(t *testing.T) {
46+
r := &RAGServiceKeysResource{ParentID: "storefront-rag-host1-data"}
47+
deps := r.Dependencies()
48+
if len(deps) != 1 {
49+
t.Fatalf("Dependencies() len = %d, want 1", len(deps))
50+
}
51+
want := filesystem.DirResourceIdentifier("storefront-rag-host1-data")
52+
if deps[0] != want {
53+
t.Errorf("Dependencies()[0] = %v, want %v", deps[0], want)
54+
}
55+
}
56+
57+
// ragKeysRC returns a resource.Context with a DirResource whose FullPath is set to parentFullPath.
58+
func ragKeysRC(t *testing.T, parentID, parentFullPath string) *resource.Context {
59+
t.Helper()
60+
parentDir := &filesystem.DirResource{
61+
ID: parentID,
62+
HostID: "host-1",
63+
Path: parentFullPath,
64+
FullPath: parentFullPath,
65+
}
66+
data, err := resource.ToResourceData(parentDir)
67+
if err != nil {
68+
t.Fatalf("ToResourceData() error = %v", err)
69+
}
70+
state := resource.NewState()
71+
state.Add(data)
72+
return &resource.Context{State: state}
73+
}
74+
75+
func TestRAGServiceKeysResource_RefreshMissingParentID(t *testing.T) {
4676
r := &RAGServiceKeysResource{ServiceInstanceID: "inst1"}
47-
err := r.Refresh(context.Background(), nil)
77+
// rc with an empty State — parent dir not found → ErrNotFound
78+
err := r.Refresh(context.Background(), &resource.Context{State: resource.NewState()})
4879
if err != resource.ErrNotFound {
4980
t.Errorf("Refresh() = %v, want ErrNotFound", err)
5081
}
5182
}
5283

5384
func TestRAGServiceKeysResource_RefreshMissingDir(t *testing.T) {
85+
parentID := "inst1-data"
86+
rc := ragKeysRC(t, parentID, "/nonexistent/path/that/does/not/exist")
5487
r := &RAGServiceKeysResource{
5588
ServiceInstanceID: "inst1",
56-
KeysDir: "/nonexistent/path/that/does/not/exist/keys",
89+
HostID: "host-1",
90+
ParentID: parentID,
5791
Keys: map[string]string{"default_rag.key": "sk-test"},
5892
}
59-
err := r.Refresh(context.Background(), nil)
93+
err := r.Refresh(context.Background(), rc)
6094
if err != resource.ErrNotFound {
6195
t.Errorf("Refresh() = %v, want ErrNotFound", err)
6296
}

server/internal/orchestrator/swarm/rag_service_user_role.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (r *RAGServiceUserRole) Create(ctx context.Context, rc *resource.Context) e
111111
Logger()
112112
logger.Info().Msg("creating RAG service user role")
113113

114-
r.Username = database.GenerateServiceUsername(r.ServiceID)
114+
r.Username = database.GenerateServiceUsername(r.ServiceID, ServiceUserRoleRO)
115115
if r.Password == "" {
116116
password, err := utils.RandomString(32)
117117
if err != nil {

0 commit comments

Comments
 (0)