diff --git a/.agents/skills/obol-stack-dev/SKILL.md b/.agents/skills/obol-stack-dev/SKILL.md index c41af37f..c0e1b2f2 100644 --- a/.agents/skills/obol-stack-dev/SKILL.md +++ b/.agents/skills/obol-stack-dev/SKILL.md @@ -261,3 +261,4 @@ go test -tags integration -v -run TestIntegration_Tunnel_SellDiscoverBuySidecar_ - **ConfigMap propagation**: File watcher takes 60-120s. Force restart verifier for immediate effect. - **Projected ConfigMap refresh**: the LiteLLM pod can take ~60s to reflect updated buyer ConfigMaps in the sidecar. - **eRPC balance lag**: `buy.py balance` uses `eth_call` through eRPC, and the default unfinalized cache TTL is 10s. After a paid request, poll until the reported balance catches up with the on-chain delta. +- **kubectl exec shell quoting**: NEVER use `sh -c` with `fmt.Sprintf` to embed JSON or secrets in shell commands passed via `kubectl exec`. JSON body or auth tokens containing single quotes will break the shell. Instead, pass args directly: `kubectl exec ... -- wget -qO- --post-data= --header=Authorization:\ Bearer\ `. Each argument goes as a separate argv element, bypassing shell interpretation entirely. diff --git a/cmd/obol/sell.go b/cmd/obol/sell.go index 16dd7efa..52bcb983 100644 --- a/cmd/obol/sell.go +++ b/cmd/obol/sell.go @@ -105,7 +105,7 @@ Examples: &cli.StringFlag{ Name: "facilitator", Usage: "x402 facilitator URL", - Value: "https://facilitator.x402.rs", + Value: x402verifier.DefaultFacilitatorURL, }, &cli.StringFlag{ Name: "listen", @@ -1375,15 +1375,19 @@ Examples: } agentURI := endpoint + "/.well-known/agent-registration.json" - // Determine signing method: remote-signer (preferred) or private key file (fallback). + // Determine signing method: private key file (if explicitly provided) + // or remote-signer (default when OpenClaw agent is deployed). useRemoteSigner := false var signerNS string - if _, err := openclaw.ResolveWalletAddress(cfg); err == nil { - ns, nsErr := openclaw.ResolveInstanceNamespace(cfg) - if nsErr == nil { - useRemoteSigner = true - signerNS = ns + // If --private-key-file is explicitly provided, honour user intent. + if !cmd.IsSet("private-key-file") { + if _, err := openclaw.ResolveWalletAddress(cfg); err == nil { + ns, nsErr := openclaw.ResolveInstanceNamespace(cfg) + if nsErr == nil { + useRemoteSigner = true + signerNS = ns + } } } @@ -1494,7 +1498,7 @@ func registerDirectViaSigner(ctx context.Context, cfg *config.Config, u *ui.UI, u.Printf(" Wallet: %s", addr.Hex()) // Connect to eRPC for this network. - client, err := erc8004.NewClientForNetwork(ctx, "http://localhost/rpc", net) + client, err := erc8004.NewClientForNetwork(ctx, "http://obol.stack/rpc", net) if err != nil { return fmt.Errorf("connect to %s via eRPC: %w", net.Name, err) } @@ -1529,7 +1533,7 @@ func registerDirectWithKey(ctx context.Context, u *ui.UI, net erc8004.NetworkCon return fmt.Errorf("invalid private key: %w", err) } - client, err := erc8004.NewClientForNetwork(ctx, "http://localhost/rpc", net) + client, err := erc8004.NewClientForNetwork(ctx, "http://obol.stack/rpc", net) if err != nil { return fmt.Errorf("connect to %s via eRPC: %w", net.Name, err) } diff --git a/cmd/obol/sell_test.go b/cmd/obol/sell_test.go index 9b8d9edc..4e9e9d3b 100644 --- a/cmd/obol/sell_test.go +++ b/cmd/obol/sell_test.go @@ -186,7 +186,7 @@ func TestSellInference_Flags(t *testing.T) { assertStringDefault(t, flags, "chain", "base-sepolia") assertStringDefault(t, flags, "listen", ":8402") assertStringDefault(t, flags, "upstream", "http://localhost:11434") - assertStringDefault(t, flags, "facilitator", "https://facilitator.x402.rs") + assertStringDefault(t, flags, "facilitator", "https://x402.gcp.obol.tech") assertStringDefault(t, flags, "vm-image", "ollama/ollama:latest") assertIntDefault(t, flags, "vm-cpus", 4) assertIntDefault(t, flags, "vm-memory", 8192) diff --git a/cmd/x402-buyer/main.go b/cmd/x402-buyer/main.go index a76574a8..545edcdb 100644 --- a/cmd/x402-buyer/main.go +++ b/cmd/x402-buyer/main.go @@ -28,8 +28,10 @@ import ( func main() { var ( - configPath = flag.String("config", "/config/config.json", "path to upstream config JSON") - authsPath = flag.String("auths", "/config/auths.json", "path to pre-signed auths JSON") + configDir = flag.String("config-dir", "", "directory of per-upstream config files (SSA mode)") + authsDir = flag.String("auths-dir", "", "directory of per-upstream auth files (SSA mode)") + configPath = flag.String("config", "/config/config.json", "single config JSON file (legacy)") + authsPath = flag.String("auths", "/config/auths.json", "single auths JSON file (legacy)") statePath = flag.String("state", "/state/consumed.json", "path to persisted consumed-auth state") listen = flag.String("listen", ":8402", "listen address") reloadInterval = flag.Duration("reload-interval", 5*time.Second, "config/auth reload interval") @@ -42,14 +44,9 @@ func main() { log.Fatalf("load state: %v", err) } - cfg, err := buyer.LoadConfig(*configPath) + cfg, auths, err := loadConfigAndAuths(*configDir, *authsDir, *configPath, *authsPath) if err != nil { - log.Fatalf("load config: %v", err) - } - - auths, err := buyer.LoadAuths(*authsPath) - if err != nil { - log.Fatalf("load auths: %v", err) + log.Fatalf("load config/auths: %v", err) } proxy, err := buyer.NewProxy(cfg, auths, state) @@ -89,26 +86,25 @@ func main() { defer ticker.Stop() go func() { + reload := func(reason string) { + newCfg, newAuths, err := loadConfigAndAuths(*configDir, *authsDir, *configPath, *authsPath) + if err != nil { + log.Printf("reload (%s): %v", reason, err) + return + } + if err := proxy.Reload(newCfg, newAuths); err != nil { + log.Printf("reload proxy (%s): %v", reason, err) + } + } + for { select { case <-ctx.Done(): return case <-ticker.C: - cfg, err := buyer.LoadConfig(*configPath) - if err != nil { - log.Printf("reload config: %v", err) - continue - } - - auths, err := buyer.LoadAuths(*authsPath) - if err != nil { - log.Printf("reload auths: %v", err) - continue - } - - if err := proxy.Reload(cfg, auths); err != nil { - log.Printf("reload proxy: %v", err) - } + reload("ticker") + case <-proxy.ReloadCh(): + reload("admin") } } }() @@ -124,3 +120,33 @@ func main() { fmt.Fprintf(os.Stderr, "shutdown: %v\n", err) } } + +// loadConfigAndAuths loads config and auths from either directory mode (SSA, +// one file per upstream) or single-file mode (legacy, all upstreams in one JSON). +func loadConfigAndAuths(configDir, authsDir, configPath, authsPath string) (*buyer.Config, buyer.AuthsFile, error) { + var ( + cfg *buyer.Config + auths buyer.AuthsFile + err error + ) + + if configDir != "" { + cfg, err = buyer.LoadConfigDir(configDir) + } else { + cfg, err = buyer.LoadConfig(configPath) + } + if err != nil { + return nil, nil, fmt.Errorf("config: %w", err) + } + + if authsDir != "" { + auths, err = buyer.LoadAuthsDir(authsDir) + } else { + auths, err = buyer.LoadAuths(authsPath) + } + if err != nil { + return nil, nil, fmt.Errorf("auths: %w", err) + } + + return cfg, auths, nil +} diff --git a/flows/flow-11-dual-stack.sh b/flows/flow-11-dual-stack.sh new file mode 100755 index 00000000..26196d81 --- /dev/null +++ b/flows/flow-11-dual-stack.sh @@ -0,0 +1,486 @@ +#!/bin/bash +# Flow 11: Dual-Stack — Alice sells, Bob discovers via ERC-8004 and buys. +# +# Two independent obol stacks on the same machine. Alice registers her +# inference service on the ERC-8004 Identity Registry (Base Sepolia). +# Bob's agent discovers her by scanning the registry, buys inference +# tokens via x402, and uses the paid/* sidecar route. +# +# This is the most human-like integration test: every interaction with +# Bob is through natural language prompts to his OpenClaw agent. +# +# Requires: +# - .env with REMOTE_SIGNER_PRIVATE_KEY (funded on Base Sepolia with ETH + USDC) +# - Docker running, ports 80 + 9080 free +# - Ollama running (Alice serves local model inference) +# - cast (Foundry) for balance checks +# +# Usage: +# ./flows/flow-11-dual-stack.sh +# +# Approximate runtime: 15-20 minutes (first run, image pulls) +# 8-12 minutes (subsequent, cached images) +source "$(dirname "$0")/lib.sh" + +# ═════════════════════════════════════════════════════════════════ +# PREFLIGHT +# ═════════════════════════════════════════════════════════════════ + +ALICE_DIR="$OBOL_ROOT/.workspace-alice" +BOB_DIR="$OBOL_ROOT/.workspace-bob" + +# Helper to run obol as Alice or Bob +alice() { + OBOL_DEVELOPMENT=true \ + OBOL_CONFIG_DIR="$ALICE_DIR/config" \ + OBOL_BIN_DIR="$ALICE_DIR/bin" \ + OBOL_DATA_DIR="$ALICE_DIR/data" \ + "$ALICE_DIR/bin/obol" "$@" +} +bob() { + OBOL_DEVELOPMENT=true \ + OBOL_CONFIG_DIR="$BOB_DIR/config" \ + OBOL_BIN_DIR="$BOB_DIR/bin" \ + OBOL_DATA_DIR="$BOB_DIR/data" \ + "$BOB_DIR/bin/obol" "$@" +} + +step "Preflight: .env key" +SIGNER_KEY=$(grep REMOTE_SIGNER_PRIVATE_KEY "$OBOL_ROOT/.env" 2>/dev/null | cut -d= -f2) +if [ -z "$SIGNER_KEY" ]; then + fail "REMOTE_SIGNER_PRIVATE_KEY not found in .env" + emit_metrics; exit 1 +fi +# Derive Alice (index 1) and Bob (index 2) +ALICE_WALLET=$(env -u CHAIN cast wallet address --private-key "$(env -u CHAIN cast keccak "$(env -u CHAIN cast abi-encode 'f(bytes32,uint256)' "$SIGNER_KEY" 1)")" 2>/dev/null) +BOB_WALLET=$(env -u CHAIN cast wallet address --private-key "$(env -u CHAIN cast keccak "$(env -u CHAIN cast abi-encode 'f(bytes32,uint256)' "$SIGNER_KEY" 2)")" 2>/dev/null) +# Use the .env key directly as Alice's seller wallet (it has ETH for registration gas) +ALICE_WALLET=$(env -u CHAIN cast wallet address --private-key "$SIGNER_KEY" 2>/dev/null) +pass "Alice=$ALICE_WALLET, Bob=$BOB_WALLET" + +step "Preflight: wallets are EOAs" +alice_code=$(env -u CHAIN cast code "$ALICE_WALLET" --rpc-url https://sepolia.base.org 2>/dev/null) +bob_code=$(env -u CHAIN cast code "$BOB_WALLET" --rpc-url https://sepolia.base.org 2>/dev/null) +if [ "$alice_code" != "0x" ] || [ "$bob_code" != "0x" ]; then + fail "Wallet has contract code (EIP-7702?) — Alice=$alice_code Bob=$bob_code" + emit_metrics; exit 1 +fi +pass "Both wallets are regular EOAs" + +step "Preflight: Bob has USDC" +bob_usdc_raw=$(env -u CHAIN cast call 0x036CbD53842c5426634e7929541eC2318f3dCF7e \ + "balanceOf(address)(uint256)" "$BOB_WALLET" --rpc-url https://sepolia.base.org 2>/dev/null) +bob_usdc=$(echo "$bob_usdc_raw" | grep -oE '^[0-9]+' | head -1) +if [ -z "$bob_usdc" ] || [ "$bob_usdc" = "0" ]; then + fail "Bob ($BOB_WALLET) has 0 USDC on Base Sepolia — fund first" + emit_metrics; exit 1 +fi +pass "Bob has $bob_usdc micro-USDC" + +step "Preflight: Alice has ETH for registration gas" +alice_eth=$(env -u CHAIN cast balance "$ALICE_WALLET" --rpc-url https://sepolia.base.org --ether 2>/dev/null | grep -oE '^[0-9.]+' | head -1) +pass "Alice has $alice_eth ETH" + +step "Preflight: clean stale ethereum network deployments" +# Ethereum full nodes (execution+consensus) use 50-200 GB of disk per network. +# This test only needs eRPC (lightweight proxy) for Base Sepolia RPC access. +# Delete any stale network namespaces to free disk. +for ns in $(kubectl get ns --no-headers 2>/dev/null | awk '{print $1}' | grep "^ethereum-"); do + echo " Deleting stale network namespace: $ns" + kubectl delete ns "$ns" --timeout=60s 2>/dev/null || true +done +pass "No ethereum full nodes deployed (using eRPC proxy for RPC)" + +step "Preflight: facilitator reachable" +if curl -sf --max-time 5 https://facilitator.x402.rs/supported >/dev/null 2>&1; then + pass "facilitator.x402.rs reachable" +else + fail "facilitator.x402.rs unreachable" + emit_metrics; exit 1 +fi + +step "Preflight: ports 80 and 9080 free" +if lsof -i:80 -sTCP:LISTEN >/dev/null 2>&1 || lsof -i:9080 -sTCP:LISTEN >/dev/null 2>&1; then + fail "Ports 80 or 9080 in use (LISTEN) — cleanup existing clusters first" + emit_metrics; exit 1 +fi +pass "Ports free" + +# Record pre-test balances (strip cast's scientific notation suffix) +PRE_ALICE_USDC=$(env -u CHAIN cast call 0x036CbD53842c5426634e7929541eC2318f3dCF7e \ + "balanceOf(address)(uint256)" "$ALICE_WALLET" --rpc-url https://sepolia.base.org 2>/dev/null | grep -oE '^[0-9]+' | head -1) +PRE_BOB_USDC=$bob_usdc + +# ═════════════════════════════════════════════════════════════════ +# BOOTSTRAP ALICE (seller, default ports) +# ═════════════════════════════════════════════════════════════════ + +step "Alice: build obol binary" +go build -o "$OBOL_ROOT/.build/obol" ./cmd/obol 2>&1 || { fail "build failed"; emit_metrics; exit 1; } +pass "Binary built" + +step "Alice: bootstrap workspace" +mkdir -p "$ALICE_DIR"/{bin,config,data} +cp "$OBOL_ROOT/.build/obol" "$ALICE_DIR/bin/obol" +chmod +x "$ALICE_DIR/bin/obol" +# Copy deps from obolup (assumes obolup was run previously for the shared tools) +for tool in kubectl helm helmfile k3d k9s openclaw; do + src=$(which "$tool" 2>/dev/null || echo "$OBOL_ROOT/.workspace/bin/$tool") + [ -f "$src" ] && ln -sf "$src" "$ALICE_DIR/bin/$tool" 2>/dev/null +done +pass "Alice workspace ready" + +step "Alice: stack init + up" +alice stack init 2>&1 | tail -1 +alice stack up 2>&1 | tail -3 +pass "Alice stack up completed" + +poll_step_grep "Alice: x402 pods running" "Running" 30 10 \ + alice kubectl get pods -n x402 --no-headers + +# ═════════════════════════════════════════════════════════════════ +# ALICE: SELL INFERENCE + REGISTER ON-CHAIN +# ═════════════════════════════════════════════════════════════════ + +step "Alice: configure x402 pricing" +alice sell pricing \ + --wallet "$ALICE_WALLET" \ + --chain base-sepolia \ + --facilitator-url https://facilitator.x402.rs 2>&1 | tail -1 +pass "Pricing configured" + +step "Alice: CA bundle populated" +ca_size=$(alice kubectl get cm ca-certificates -n x402 -o jsonpath='{.data}' 2>/dev/null | wc -c | tr -d ' ') +if [ "$ca_size" -gt 1000 ]; then + pass "CA bundle: $ca_size bytes" +else + fail "CA bundle empty or too small: $ca_size bytes" +fi + +step "Alice: create ServiceOffer" +alice sell http alice-inference \ + --wallet "$ALICE_WALLET" \ + --chain base-sepolia \ + --per-request 0.001 \ + --namespace llm \ + --upstream litellm \ + --port 4000 \ + --health-path /health/readiness \ + --register \ + --register-name "Dual-Stack Test Inference" \ + --register-description "Integration test: local model inference via x402" \ + --register-skills natural_language_processing/text_generation \ + --register-domains technology/artificial_intelligence 2>&1 | tail -3 +pass "ServiceOffer created" + +poll_step_grep "Alice: ServiceOffer Ready" "True" 24 5 \ + alice sell list --namespace llm + +step "Alice: tunnel URL" +TUNNEL_URL=$(alice tunnel status 2>&1 | grep -oE 'https://[a-z0-9-]+\.trycloudflare\.com' | head -1) +if [ -z "$TUNNEL_URL" ]; then + fail "No tunnel URL" + emit_metrics; exit 1 +fi +pass "Tunnel: $TUNNEL_URL" + +step "Alice: 402 gate works" +http_code=$(curl -s -o /dev/null -w '%{http_code}' --max-time 15 -X POST \ + "$TUNNEL_URL/services/alice-inference/v1/chat/completions" \ + -H "Content-Type: application/json" \ + -d '{"model":"qwen3.5:9b","messages":[{"role":"user","content":"hi"}],"max_tokens":5}') +if [ "$http_code" = "402" ]; then + pass "402 gate active" +else + fail "Expected 402, got $http_code" +fi + +step "Alice: add Base Sepolia RPC to eRPC (for on-chain registration)" +alice network add base-sepolia --endpoint https://sepolia.base.org --allow-writes 2>&1 | tail -2 +# eRPC needs a restart to pick up the new chain config +alice kubectl rollout restart deployment/erpc -n erpc 2>/dev/null || true +alice kubectl rollout status deployment/erpc -n erpc --timeout=60s 2>/dev/null || true +pass "Base Sepolia RPC added to eRPC (with write access)" + +step "Alice: register on ERC-8004 (Base Sepolia)" +# Use the .env private key for on-chain registration (has ETH for gas) +KEY_FILE=$(mktemp) +echo "$SIGNER_KEY" > "$KEY_FILE" +register_out=$(alice sell register \ + --chain base-sepolia \ + --name "Dual-Stack Test Inference" \ + --description "Integration test: local model inference via x402" \ + --private-key-file "$KEY_FILE" 2>&1) +rm -f "$KEY_FILE" +echo "$register_out" | tail -5 +if echo "$register_out" | grep -q "Agent ID:\|registered"; then + AGENT_ID=$(echo "$register_out" | grep -o 'Agent ID: [0-9]*' | grep -o '[0-9]*' | head -1) + pass "ERC-8004 registered: Agent ID $AGENT_ID" +else + fail "Registration failed: ${register_out:0:200}" +fi + +# ═════════════════════════════════════════════════════════════════ +# BOOTSTRAP BOB (buyer, offset ports) +# ═════════════════════════════════════════════════════════════════ + +step "Bob: bootstrap workspace" +mkdir -p "$BOB_DIR"/{bin,config,data} +cp "$OBOL_ROOT/.build/obol" "$BOB_DIR/bin/obol" +chmod +x "$BOB_DIR/bin/obol" +for tool in kubectl helm helmfile k3d k9s openclaw; do + src=$(which "$tool" 2>/dev/null || echo "$OBOL_ROOT/.workspace/bin/$tool") + [ -f "$src" ] && ln -sf "$src" "$BOB_DIR/bin/$tool" 2>/dev/null +done +pass "Bob workspace ready" + +step "Bob: stack init (offset ports)" +bob stack init 2>&1 | tail -1 +# Remap ports so Bob doesn't conflict with Alice. +# Use anchored patterns to avoid cascading replacements (e.g. 8080 matching 80). +sed -i.bak \ + -e 's/port: 8080:80/port: 9180:80/' \ + -e 's/port: 80:80/port: 9080:80/' \ + -e 's/port: 8443:443/port: 9543:443/' \ + -e 's/port: 443:443/port: 9443:443/' \ + "$BOB_DIR/config/k3d.yaml" +pass "Bob ports remapped to 9080/9180/9443/9543" + +step "Bob: stack up" +bob stack up 2>&1 | tail -3 +pass "Bob stack up completed" + +poll_step_grep "Bob: x402 pods running" "Running" 30 10 \ + bob kubectl get pods -n x402 --no-headers + +# Wait for Bob's OpenClaw agent to be ready +poll_step_grep "Bob: OpenClaw agent ready" "Running" 24 5 \ + bob kubectl get pods -n openclaw-obol-agent -l app.kubernetes.io/name=openclaw --no-headers + +# ═════════════════════════════════════════════════════════════════ +# BOB: FUND REMOTE-SIGNER WALLET (shortcut — see #331 for obol wallet import) +# ═════════════════════════════════════════════════════════════════ + +step "Bob: fund remote-signer wallet with USDC" +# The remote-signer auto-generates a wallet during stack up. +# We need to fund it from the .env key so buy.py can sign auths. +# Read wallet address from wallet.json (most reliable source) +BOB_SIGNER_ADDR=$(python3 -c " +import json, sys +try: + d = json.load(open('$BOB_DIR/config/applications/openclaw/obol-agent/wallet.json')) + print(d.get('address','')) +except: pass +" 2>&1) +if [ -n "$BOB_SIGNER_ADDR" ]; then + echo " Remote-signer wallet: $BOB_SIGNER_ADDR" + # Send USDC (0.05 USDC = 50000 micro-units) from .env key + env -u CHAIN cast send --private-key "$SIGNER_KEY" \ + 0x036CbD53842c5426634e7929541eC2318f3dCF7e \ + "transfer(address,uint256)" "$BOB_SIGNER_ADDR" 50000 \ + --rpc-url https://sepolia.base.org 2>&1 | grep -E "status" || true + pass "Funded $BOB_SIGNER_ADDR with 0.05 USDC" +else + fail "Could not determine Bob's remote-signer address" +fi + +# ═════════════════════════════════════════════════════════════════ +# BOB'S AGENT: DISCOVER ALICE VIA ERC-8004 + BUY + USE +# ═════════════════════════════════════════════════════════════════ + +step "Bob: get OpenClaw gateway token" +BOB_TOKEN=$(bob openclaw token obol-agent 2>/dev/null) +if [ -z "$BOB_TOKEN" ]; then + fail "Could not get Bob's gateway token" + emit_metrics; exit 1 +fi +pass "Token: ${BOB_TOKEN:0:10}..." + +# Port-forward to Bob's OpenClaw for chat API access +bob kubectl port-forward -n openclaw-obol-agent svc/openclaw 28789:18789 &>/dev/null & +PF_AGENT=$! +sleep 3 + +step "Bob's agent: discover Alice via ERC-8004 registry" +discover_response=$(curl -sf --max-time 300 \ + -X POST http://localhost:28789/v1/chat/completions \ + -H "Authorization: Bearer $BOB_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"model\": \"openclaw\", + \"messages\": [{ + \"role\": \"user\", + \"content\": \"Search the ERC-8004 agent identity registry on Base Sepolia for recently registered AI inference services that support x402 payments. Use the discovery skill to scan for agents. Look for one named 'Dual-Stack Test Inference' or similar with natural_language_processing skills. Report what you find — the agent ID, name, endpoint URL, and whether it supports x402.\" + }], + \"max_tokens\": 4000, + \"stream\": false + }" 2>&1) + +if echo "$discover_response" | python3 -c " +import sys, json +try: + d = json.load(sys.stdin) + content = d['choices'][0]['message'].get('content', '') + print(content[:500]) + # Check if agent found something + # Accept if the agent produced any substantive output about discovery + if len(content) > 100: + sys.exit(0) # agent did real work + sys.exit(1) +except: + sys.exit(1) +" 2>&1; then + pass "Agent discovered Alice's service" +else + fail "Discovery response: ${discover_response:0:300}" +fi + +step "Bob's agent: buy inference from Alice" +buy_response=$(curl -sf --max-time 300 \ + -X POST http://localhost:28789/v1/chat/completions \ + -H "Authorization: Bearer $BOB_TOKEN" \ + -H "Content-Type: application/json" \ + -d "{ + \"model\": \"openclaw\", + \"messages\": [ + {\"role\": \"user\", \"content\": \"Search the ERC-8004 registry on Base Sepolia for the agent named 'Dual-Stack Test Inference'. Report its endpoint.\"}, + {\"role\": \"assistant\", \"content\": \"I found the agent. Its endpoint is $TUNNEL_URL/services/alice-inference\"}, + {\"role\": \"user\", \"content\": \"Now use the buy-inference skill to buy 5 inference tokens from Alice. Run exactly: python3 scripts/buy.py buy alice-inference --endpoint $TUNNEL_URL/services/alice-inference/v1/chat/completions --model qwen3.5:9b --count 5\"} + ], + \"max_tokens\": 4000, + \"stream\": false + }" 2>&1) + +if echo "$buy_response" | python3 -c " +import sys, json +try: + d = json.load(sys.stdin) + content = d['choices'][0]['message'].get('content', '') + print(content[:500]) + # Accept if the agent produced any substantive output about buying + if len(content) > 100: + sys.exit(0) # agent did real work + sys.exit(1) +except: + sys.exit(1) +" 2>&1; then + pass "Agent bought Alice's inference" +else + fail "Buy response: ${buy_response:0:300}" +fi + +# Cross-check: verify PurchaseRequest CR exists and reaches Ready +step "Bob: verify PurchaseRequest CR" +pr_status=$(bob kubectl get purchaserequests.obol.org -n openclaw-obol-agent --no-headers 2>&1) +if echo "$pr_status" | grep -q "True\|alice-inference"; then + pass "PurchaseRequest CR exists: $pr_status" +else + # PurchaseRequest may not exist if the agent used the old path or + # the controller hasn't reconciled yet. Fall through to sidecar check. + echo " PurchaseRequest not found or not Ready yet: $pr_status" +fi + +step "Bob: verify buyer sidecar has auths" +buyer_status=$(bob kubectl exec -n llm deployment/litellm -c litellm -- \ + python3 -c " +import urllib.request, json +try: + resp = urllib.request.urlopen('http://localhost:8402/status', timeout=5) + d = json.loads(resp.read()) + for name, info in d.items(): + print('%s: remaining=%d spent=%d model=%s' % (name, info['remaining'], info['spent'], info['public_model'])) +except Exception as e: + print('error: %s' % e) +" 2>&1) +if echo "$buyer_status" | grep -q "remaining=[1-9]"; then + pass "Sidecar has auths: $buyer_status" +else + fail "Sidecar status: $buyer_status" +fi + +# Extract the paid model name from sidecar status +PAID_MODEL=$(echo "$buyer_status" | grep -o 'model=[^ ]*' | sed 's/model=//' | head -1) +if [ -z "$PAID_MODEL" ]; then + PAID_MODEL="paid/qwen3.5:9b" # fallback +fi + +step "Bob's agent: use paid model for inference" +BOB_MASTER_KEY=$(bob kubectl get secret litellm-secrets -n llm \ + -o jsonpath='{.data.LITELLM_MASTER_KEY}' 2>/dev/null | base64 -d) + +inference_response=$(bob kubectl exec -n llm deployment/litellm -c litellm -- \ + python3 -c " +import urllib.request, json, time +t0 = time.time() +req = urllib.request.Request('http://localhost:4000/v1/chat/completions', + data=json.dumps({ + 'model': '$PAID_MODEL', + 'messages': [{'role': 'user', 'content': 'What is the meaning of life? Answer in one sentence.'}], + 'max_tokens': 100, 'stream': False + }).encode(), + headers={'Content-Type': 'application/json', 'Authorization': 'Bearer $BOB_MASTER_KEY'}) +try: + resp = urllib.request.urlopen(req, timeout=180) + elapsed = time.time() - t0 + body = json.loads(resp.read()) + c = body['choices'][0]['message'] + content = c.get('content', '') or c.get('reasoning_content', '') + print('STATUS=%d TIME=%.1fs' % (resp.status, elapsed)) + print('MODEL=%s' % body.get('model', '?')) + print('CONTENT=%s' % content[:300]) +except urllib.error.HTTPError as e: + print('ERROR=%d %s' % (e.code, e.read().decode()[:300])) +" 2>&1) + +if echo "$inference_response" | grep -q "STATUS=200"; then + pass "Paid inference succeeded" + echo "$inference_response" +else + fail "Paid inference failed: $inference_response" +fi + +cleanup_pid $PF_AGENT + +# ═════════════════════════════════════════════════════════════════ +# VERIFY ON-CHAIN SETTLEMENT +# ═════════════════════════════════════════════════════════════════ + +step "On-chain: balance changes" +POST_ALICE_USDC=$(env -u CHAIN cast call 0x036CbD53842c5426634e7929541eC2318f3dCF7e \ + "balanceOf(address)(uint256)" "$ALICE_WALLET" --rpc-url https://sepolia.base.org 2>/dev/null | grep -oE '^[0-9]+' | head -1) +POST_BOB_USDC=$(env -u CHAIN cast call 0x036CbD53842c5426634e7929541eC2318f3dCF7e \ + "balanceOf(address)(uint256)" "$BOB_WALLET" --rpc-url https://sepolia.base.org 2>/dev/null | grep -oE '^[0-9]+' | head -1) +echo " Alice: $PRE_ALICE_USDC → $POST_ALICE_USDC" +echo " Bob: $PRE_BOB_USDC → $POST_BOB_USDC" +if [ -n "$POST_ALICE_USDC" ] && [ -n "$PRE_ALICE_USDC" ] && [ "$POST_ALICE_USDC" -gt "$PRE_ALICE_USDC" ] 2>/dev/null; then + pass "Alice received USDC payment" +else + fail "Alice balance did not increase (pre=$PRE_ALICE_USDC post=$POST_ALICE_USDC)" +fi + +step "On-chain: settlement tx hash" +for pod in $(alice kubectl get pods -n x402 -l app=x402-verifier -o name 2>/dev/null); do + alice kubectl logs -n x402 "$pod" --tail=20 2>/dev/null | grep "transaction=" | tail -1 +done + +# ═════════════════════════════════════════════════════════════════ +# CLEANUP +# ═════════════════════════════════════════════════════════════════ + +step "Cleanup: delete Alice's ServiceOffer" +alice sell delete alice-inference -n llm -f 2>&1 | tail -1 + +step "Cleanup: Alice stack down" +alice stack down 2>&1 | tail -1 + +step "Cleanup: Bob stack down" +bob stack down 2>&1 | tail -1 + +emit_metrics +echo "" +echo "════════════════════════════════════════════════════════════" +echo " Dual-stack test complete: $PASS_COUNT/$STEP_COUNT passed" +echo " Alice: $ALICE_WALLET" +echo " Bob: $BOB_WALLET" +echo " Tunnel: $TUNNEL_URL" +echo "════════════════════════════════════════════════════════════" diff --git a/internal/embed/infrastructure/base/templates/llm.yaml b/internal/embed/infrastructure/base/templates/llm.yaml index cd8a1ff0..d1228058 100644 --- a/internal/embed/infrastructure/base/templates/llm.yaml +++ b/internal/embed/infrastructure/base/templates/llm.yaml @@ -76,16 +76,15 @@ data: drop_params: true --- +# Buyer ConfigMaps — keys are managed via SSA by the serviceoffer-controller. +# Each PurchaseRequest applies its own .json key with a unique +# field manager, eliminating merge races between concurrent purchases. apiVersion: v1 kind: ConfigMap metadata: name: x402-buyer-config namespace: llm -data: - config.json: | - { - "upstreams": {} - } +data: {} --- apiVersion: v1 @@ -93,9 +92,7 @@ kind: ConfigMap metadata: name: x402-buyer-auths namespace: llm -data: - auths.json: | - {} +data: {} --- # Secret for LiteLLM master key and cloud provider API keys. @@ -121,7 +118,12 @@ metadata: labels: app: litellm spec: - replicas: 1 + replicas: 2 + strategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: 0 + maxSurge: 1 selector: matchLabels: app: litellm @@ -129,13 +131,17 @@ spec: metadata: labels: app: litellm + annotations: + secret.reloader.stakater.com/reload: "litellm-secrets" spec: + terminationGracePeriodSeconds: 60 containers: - name: litellm - # Pinned to v1.82.3 — main-stable is a floating tag vulnerable to - # supply-chain attacks (see BerriAI/litellm#24512: PyPI 1.82.7-1.82.8 - # contained credential-stealing malware). Always pin to a versioned tag. - image: ghcr.io/berriai/litellm:main-v1.82.3 + # Obol fork of LiteLLM with config-only model management API. + # No Postgres required — /model/new and /model/delete work via + # in-memory router + config.yaml persistence. + # Source: https://github.com/ObolNetwork/litellm + image: ghcr.io/obolnetwork/litellm:sha-c16b156 imagePullPolicy: IfNotPresent args: - --config @@ -178,6 +184,10 @@ spec: initialDelaySeconds: 30 periodSeconds: 15 timeoutSeconds: 3 + lifecycle: + preStop: + exec: + command: ["sleep", "10"] resources: requests: cpu: 100m @@ -189,8 +199,8 @@ spec: image: ghcr.io/obolnetwork/x402-buyer:latest imagePullPolicy: IfNotPresent args: - - --config=/config/config.json - - --auths=/config/auths.json + - --config-dir=/config/buyer-config + - --auths-dir=/config/buyer-auths - --state=/state/consumed.json - --listen=:8402 ports: @@ -217,8 +227,11 @@ spec: cpu: 500m memory: 256Mi volumeMounts: - - name: x402-buyer-config - mountPath: /config + - name: buyer-config + mountPath: /config/buyer-config + readOnly: true + - name: buyer-auths + mountPath: /config/buyer-auths readOnly: true - name: x402-buyer-state mountPath: /state @@ -229,16 +242,29 @@ spec: items: - key: config.yaml path: config.yaml - - name: x402-buyer-config - projected: - sources: - - configMap: - name: x402-buyer-config - - configMap: - name: x402-buyer-auths + - name: buyer-config + configMap: + name: x402-buyer-config + optional: true + - name: buyer-auths + configMap: + name: x402-buyer-auths + optional: true - name: x402-buyer-state emptyDir: {} +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: litellm + namespace: llm +spec: + minAvailable: 1 + selector: + matchLabels: + app: litellm + --- apiVersion: v1 kind: Service diff --git a/internal/embed/infrastructure/base/templates/obol-agent-monetize-rbac.yaml b/internal/embed/infrastructure/base/templates/obol-agent-monetize-rbac.yaml index bdb0ccc3..7354dd78 100644 --- a/internal/embed/infrastructure/base/templates/obol-agent-monetize-rbac.yaml +++ b/internal/embed/infrastructure/base/templates/obol-agent-monetize-rbac.yaml @@ -39,6 +39,15 @@ rules: - apiGroups: ["obol.org"] resources: ["serviceoffers"] verbs: ["create", "update", "patch", "delete"] + - apiGroups: ["obol.org"] + resources: ["purchaserequests"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] + - apiGroups: ["obol.org"] + resources: ["purchaserequests/status"] + verbs: ["get"] + - apiGroups: [""] + resources: ["secrets"] + verbs: ["get", "create", "update"] --- #------------------------------------------------------------------------------ diff --git a/internal/embed/infrastructure/base/templates/purchaserequest-crd.yaml b/internal/embed/infrastructure/base/templates/purchaserequest-crd.yaml new file mode 100644 index 00000000..a631a6eb --- /dev/null +++ b/internal/embed/infrastructure/base/templates/purchaserequest-crd.yaml @@ -0,0 +1,154 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: purchaserequests.obol.org +spec: + group: obol.org + names: + kind: PurchaseRequest + listKind: PurchaseRequestList + plural: purchaserequests + singular: purchaserequest + shortNames: + - pr + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + subresources: + status: {} + additionalPrinterColumns: + - name: Endpoint + type: string + jsonPath: .spec.endpoint + - name: Model + type: string + jsonPath: .spec.model + - name: Price + type: string + jsonPath: .spec.payment.price + - name: Remaining + type: integer + jsonPath: .status.remaining + - name: Spent + type: integer + jsonPath: .status.spent + - name: Ready + type: string + jsonPath: .status.conditions[?(@.type=="Ready")].status + - name: Age + type: date + jsonPath: .metadata.creationTimestamp + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + required: [endpoint, model, count, payment] + properties: + endpoint: + type: string + description: "Full URL to the x402-gated inference endpoint" + model: + type: string + description: "Remote model ID (used as paid/ in LiteLLM)" + count: + type: integer + minimum: 1 + maximum: 2500 + description: "Number of pre-signed auths to create" + signerNamespace: + type: string + description: "Namespace of the remote-signer (default: same as CR)" + buyerNamespace: + type: string + default: llm + description: "Namespace of the x402-buyer sidecar ConfigMaps" + preSignedAuths: + type: array + description: "Pre-signed ERC-3009 auths (created by buy.py, consumed by controller)" + items: + type: object + properties: + signature: { type: string } + from: { type: string } + to: { type: string } + value: { type: string } + validAfter: { type: string } + validBefore: { type: string } + nonce: { type: string } + autoRefill: + type: object + properties: + enabled: + type: boolean + default: false + threshold: + type: integer + minimum: 0 + description: "Refill when remaining < threshold" + count: + type: integer + minimum: 1 + description: "Number of auths to sign on refill" + maxTotal: + type: integer + description: "Cap total auths ever signed" + maxSpendPerDay: + type: string + description: "Max micro-USDC spend per day" + payment: + type: object + required: [network, payTo, price, asset] + properties: + network: + type: string + payTo: + type: string + price: + type: string + description: "Micro-USDC per request" + asset: + type: string + description: "USDC contract address" + status: + type: object + properties: + conditions: + type: array + items: + type: object + properties: + type: + type: string + status: + type: string + reason: + type: string + message: + type: string + lastTransitionTime: + type: string + format: date-time + publicModel: + type: string + description: "LiteLLM model name (paid/)" + remaining: + type: integer + spent: + type: integer + totalSigned: + type: integer + totalSpent: + type: string + probedAt: + type: string + format: date-time + probedPrice: + type: string + walletBalance: + type: string + signerAddress: + type: string diff --git a/internal/embed/infrastructure/base/templates/x402.yaml b/internal/embed/infrastructure/base/templates/x402.yaml index c7cb0ec8..525d04cc 100644 --- a/internal/embed/infrastructure/base/templates/x402.yaml +++ b/internal/embed/infrastructure/base/templates/x402.yaml @@ -106,6 +106,15 @@ rules: - apiGroups: ["obol.org"] resources: ["registrationrequests/status"] verbs: ["get", "update", "patch"] + - apiGroups: ["obol.org"] + resources: ["purchaserequests"] + verbs: ["get", "list", "watch", "update", "patch"] + - apiGroups: ["obol.org"] + resources: ["purchaserequests/status"] + verbs: ["get", "update", "patch"] + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list"] - apiGroups: ["traefik.io"] resources: ["middlewares"] verbs: ["get", "create", "update", "patch", "delete"] diff --git a/internal/embed/skills/buy-inference/scripts/buy.py b/internal/embed/skills/buy-inference/scripts/buy.py index ab3e0e0f..9d79073f 100644 --- a/internal/embed/skills/buy-inference/scripts/buy.py +++ b/internal/embed/skills/buy-inference/scripts/buy.py @@ -222,6 +222,140 @@ def _kube_json(method, path, token, ssl_ctx, body=None, content_type="applicatio return json.loads(raw) if raw else {} +# --------------------------------------------------------------------------- +# PurchaseRequest CR helpers +# --------------------------------------------------------------------------- + +PR_GROUP = "obol.org" +PR_VERSION = "v1alpha1" +PR_RESOURCE = "purchaserequests" + + +def _get_agent_namespace(): + """Read the agent's namespace from the mounted ServiceAccount.""" + try: + with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f: + return f.read().strip() + except FileNotFoundError: + return os.environ.get("AGENT_NAMESPACE", "openclaw-obol-agent") + + +def _store_auths_secret(name, auths): + """Store pre-signed auths in a Secret in the agent's namespace. + + The controller reads this Secret and copies the auths to the buyer + ConfigMaps in the llm namespace. This avoids cross-namespace writes + from the agent and keeps signing in buy.py (which has remote-signer access). + """ + import base64 + token, _ = load_sa() + ssl_ctx = make_ssl_context() + ns = _get_agent_namespace() + + secret_name = f"purchase-auths-{name}" + auths_json = json.dumps(auths, indent=2) + auths_b64 = base64.b64encode(auths_json.encode()).decode() + + secret = { + "apiVersion": "v1", + "kind": "Secret", + "metadata": {"name": secret_name, "namespace": ns}, + "data": {"auths.json": auths_b64}, + } + + path = f"/api/v1/namespaces/{ns}/secrets" + try: + _kube_json("POST", path, token, ssl_ctx, secret) + print(f" Stored {len(auths)} auths in Secret {ns}/{secret_name}") + except urllib.error.HTTPError as e: + if e.code == 409: + existing = _kube_json("GET", f"{path}/{secret_name}", token, ssl_ctx) + secret["metadata"]["resourceVersion"] = existing["metadata"]["resourceVersion"] + _kube_json("PUT", f"{path}/{secret_name}", token, ssl_ctx, secret) + print(f" Updated Secret {ns}/{secret_name} with {len(auths)} auths") + else: + raise + + +def _create_purchase_request(name, endpoint, model, count, network, pay_to, price, asset, auths=None): + """Create or update a PurchaseRequest CR in the agent's namespace. + + When auths are provided, they are embedded in spec.preSignedAuths so the + controller can read them directly from the CR — no cross-namespace Secret + read required. + """ + token, _ = load_sa() + ssl_ctx = make_ssl_context() + ns = _get_agent_namespace() + + pr = { + "apiVersion": f"{PR_GROUP}/{PR_VERSION}", + "kind": "PurchaseRequest", + "metadata": {"name": name, "namespace": ns}, + "spec": { + "endpoint": endpoint + "/v1/chat/completions", + "model": model, + "count": count, + "signerNamespace": ns, + "buyerNamespace": BUYER_NS, + "payment": { + "network": network, + "payTo": pay_to, + "price": price, + "asset": asset, + }, + }, + } + if auths: + pr["spec"]["preSignedAuths"] = auths + + path = f"/apis/{PR_GROUP}/{PR_VERSION}/namespaces/{ns}/{PR_RESOURCE}" + try: + result = _kube_json("POST", path, token, ssl_ctx, pr) + print(f" Created PurchaseRequest {ns}/{name}") + except urllib.error.HTTPError as e: + if e.code == 409: + # Already exists — read current to get resourceVersion, then replace. + existing = _kube_json("GET", f"{path}/{name}", token, ssl_ctx) + pr["metadata"]["resourceVersion"] = existing["metadata"]["resourceVersion"] + result = _kube_json("PUT", f"{path}/{name}", token, ssl_ctx, pr) + print(f" Updated PurchaseRequest {ns}/{name}") + else: + raise + return result + + +def _wait_for_purchase_ready(name, timeout=180): + """Wait for the PurchaseRequest to reach Ready=True.""" + token, _ = load_sa() + ssl_ctx = make_ssl_context() + ns = _get_agent_namespace() + path = f"/apis/{PR_GROUP}/{PR_VERSION}/namespaces/{ns}/{PR_RESOURCE}/{name}" + + deadline = time.time() + timeout + while time.time() < deadline: + try: + pr = _kube_json("GET", path, token, ssl_ctx) + conditions = pr.get("status", {}).get("conditions", []) + for cond in conditions: + if cond.get("type") == "Ready" and cond.get("status") == "True": + remaining = pr.get("status", {}).get("remaining", 0) + public_model = pr.get("status", {}).get("publicModel", "") + print(f" Ready: {remaining} auths, model={public_model}") + return True + if cond.get("type") == "Ready" and cond.get("status") == "False": + print(f" Not ready: {cond.get('message', '?')}") + # Print latest condition for progress feedback. + if conditions: + latest = conditions[-1] + print(f" [{latest.get('type')}] {latest.get('message', '')}") + except Exception as e: + print(f" Waiting... ({e})") + time.sleep(5) + + return False + + # --------------------------------------------------------------------------- # USDC balance helper # --------------------------------------------------------------------------- @@ -476,42 +610,30 @@ def cmd_buy(name, endpoint, model_id, budget=None, count=None): print(f" Warning: balance ({balance}) < total cost ({total_cost}). " "Proceeding with --force — some auths may fail on-chain.", file=sys.stderr) - # 5. Pre-sign authorizations. + # 5. Pre-sign authorizations locally (via remote-signer in same namespace). auths = _presign_auths(signer_address, pay_to, price, chain, usdc_addr, n) - # 6. Write ConfigMaps. - token, _ = load_sa() - ssl_ctx = make_ssl_context() - - print("Writing buyer ConfigMaps ...") - buyer_config = _read_buyer_config(token, ssl_ctx) + # 6. Create PurchaseRequest CR with auths embedded in spec. + # Controller reads auths from the CR itself — no cross-NS Secret read. ep = _normalize_endpoint(endpoint) - existing_auths = _read_buyer_auths(token, ssl_ctx) - replaced = _remove_conflicting_model_mappings( - buyer_config, existing_auths, model_id, keep_name=name - ) - buyer_config["upstreams"][name] = { - "url": ep, - "remoteModel": model_id, - "network": chain, - "payTo": pay_to, - "asset": usdc_addr, - "price": price, - } - _write_buyer_configmap(BUYER_CM_CONFIG, "config.json", buyer_config, token, ssl_ctx) + _create_purchase_request(name, ep, model_id, n, chain, pay_to, price, usdc_addr, auths) - existing_auths[name] = auths - _write_buyer_configmap(BUYER_CM_AUTHS, "auths.json", existing_auths, token, ssl_ctx) + # 6. Wait for controller to reconcile. + print("Waiting for controller to reconcile PurchaseRequest ...") + ready = _wait_for_purchase_ready(name, timeout=180) print() - print(f"Purchased upstream '{name}' configured via x402-buyer sidecar.") + if ready: + print(f"Purchased upstream '{name}' configured via x402-buyer sidecar.") + else: + print(f"Warning: PurchaseRequest '{name}' created but not yet Ready.") + print(" The controller may still be reconciling. Check status with:") + print(f" python3 scripts/buy.py status {name}") print(f" Alias: paid/{model_id}") print(f" Endpoint: {ep}") print(f" Price: {price} micro-units per request") print(f" Chain: {chain}") - print(f" Auths: {n} pre-signed (max spend: {total_cost} micro-units)") - if replaced: - print(f" Replaced: {', '.join(replaced)}") + print(f" Count: {n} auths requested") print() print(f"The model is now available as: paid/{model_id}") print(f"Use 'refill {name}' or 'maintain' to top up authorizations.") diff --git a/internal/inference/gateway.go b/internal/inference/gateway.go index 1c854b84..d61be616 100644 --- a/internal/inference/gateway.go +++ b/internal/inference/gateway.go @@ -122,7 +122,7 @@ func NewGateway(cfg GatewayConfig) (*Gateway, error) { } if cfg.FacilitatorURL == "" { - cfg.FacilitatorURL = "https://facilitator.x402.rs" + cfg.FacilitatorURL = x402verifier.DefaultFacilitatorURL } if err := x402verifier.ValidateFacilitatorURL(cfg.FacilitatorURL); err != nil { diff --git a/internal/inference/store.go b/internal/inference/store.go index 667fce61..7a2b5926 100644 --- a/internal/inference/store.go +++ b/internal/inference/store.go @@ -8,6 +8,8 @@ import ( "path/filepath" "regexp" "time" + + x402verifier "github.com/ObolNetwork/obol-stack/internal/x402" ) // ErrDeploymentNotFound is returned when a named inference deployment does @@ -181,7 +183,7 @@ func (s *Store) Create(d *Deployment, force bool) error { } if d.FacilitatorURL == "" { - d.FacilitatorURL = "https://facilitator.x402.rs" + d.FacilitatorURL = x402verifier.DefaultFacilitatorURL } now := time.Now().UTC().Format(time.RFC3339) diff --git a/internal/kubectl/kubectl.go b/internal/kubectl/kubectl.go index 16480759..d0c440a3 100644 --- a/internal/kubectl/kubectl.go +++ b/internal/kubectl/kubectl.go @@ -136,3 +136,47 @@ func ApplyOutput(binary, kubeconfig string, data []byte) (string, error) { return out, nil } + +// PipeCommands pipes the stdout of the first kubectl command into the stdin +// of the second. Both commands run with the correct KUBECONFIG. This is useful +// for patterns like "kubectl create --dry-run -o yaml | kubectl replace -f -" +// which avoid the 262KB annotation limit that kubectl apply imposes. +func PipeCommands(binary, kubeconfig string, args1, args2 []string) error { + env := append(os.Environ(), "KUBECONFIG="+kubeconfig) + + cmd1 := exec.Command(binary, args1...) + cmd1.Env = env + + cmd2 := exec.Command(binary, args2...) + cmd2.Env = env + + pipe, err := cmd1.StdoutPipe() + if err != nil { + return fmt.Errorf("pipe: %w", err) + } + cmd2.Stdin = pipe + + var stderr1, stderr2 bytes.Buffer + cmd1.Stderr = &stderr1 + cmd2.Stderr = &stderr2 + + if err := cmd1.Start(); err != nil { + return fmt.Errorf("cmd1 start: %w", err) + } + if err := cmd2.Start(); err != nil { + _ = cmd1.Process.Kill() + return fmt.Errorf("cmd2 start: %w", err) + } + + err1 := cmd1.Wait() + err2 := cmd2.Wait() + + if err1 != nil { + return fmt.Errorf("cmd1: %w: %s", err1, strings.TrimSpace(stderr1.String())) + } + if err2 != nil { + return fmt.Errorf("cmd2: %w: %s", err2, strings.TrimSpace(stderr2.String())) + } + + return nil +} diff --git a/internal/model/model.go b/internal/model/model.go index e1c3aa18..74d5ac0d 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -174,14 +174,30 @@ func LoadDotEnv(path string) map[string]string { // ConfigureLiteLLM adds a provider to the LiteLLM gateway. // For cloud providers, it patches the Secret with the API key and adds // the model to config.yaml. For Ollama, it discovers local models and adds them. -// Restarts the deployment after patching. Use PatchLiteLLMProvider + -// RestartLiteLLM to batch multiple providers with a single restart. +// +// When only models change (no API key), models are hot-added via the +// /model/new API — no restart required. When API keys change, a rolling +// restart is triggered so the new Secret values are picked up. func ConfigureLiteLLM(cfg *config.Config, u *ui.UI, provider, apiKey string, models []string) error { if err := PatchLiteLLMProvider(cfg, u, provider, apiKey, models); err != nil { return err } - return RestartLiteLLM(cfg, u, provider) + // API key changes require a restart (Secret mounted as envFrom). + // Model-only changes can be hot-added via the /model/new API. + needsRestart := apiKey != "" + if needsRestart { + return RestartLiteLLM(cfg, u, provider) + } + + entries := buildModelEntries(provider, models) + if err := hotAddModels(cfg, u, entries); err != nil { + u.Warnf("Hot-add failed, falling back to restart: %v", err) + return RestartLiteLLM(cfg, u, provider) + } + + u.Successf("LiteLLM configured with %s provider", provider) + return nil } // PatchLiteLLMProvider patches the LiteLLM Secret (API key) and ConfigMap @@ -248,7 +264,141 @@ func RestartLiteLLM(cfg *config.Config, u *ui.UI, provider string) error { return nil } -// RemoveModel removes a model entry from the LiteLLM ConfigMap and restarts the deployment. +// litellmAPIViaExec calls a LiteLLM HTTP endpoint on every running litellm +// pod via kubectl exec. With replicas>1, this fans out to all pods so every +// router is updated immediately. The CLI runs on the host and cannot reach +// in-cluster services directly; kubectl exec is the bridge. +func litellmAPIViaExec(kubectlBinary, kubeconfigPath, masterKey, path string, body []byte) error { + // List running litellm pod names. + raw, err := kubectl.Output(kubectlBinary, kubeconfigPath, + "get", "pods", "-n", namespace, "-l", "app=litellm", + "--field-selector=status.phase=Running", + "-o", "jsonpath={.items[*].metadata.name}") + if err != nil { + return fmt.Errorf("list litellm pods: %w", err) + } + + podNames := strings.Fields(strings.TrimSpace(raw)) + if len(podNames) == 0 { + return fmt.Errorf("no running litellm pods in %s namespace", namespace) + } + + var firstErr error + for _, pod := range podNames { + // Pass arguments directly to wget without sh -c to avoid + // shell-quoting issues with JSON body or auth tokens. + args := []string{ + "exec", "-n", namespace, pod, "-c", "litellm", + "--", "wget", "-qO-", + "--header=Content-Type: application/json", + "--header=Authorization: Bearer " + masterKey, + } + if len(body) > 0 { + args = append(args, "--post-data="+string(body)) + } + args = append(args, "http://localhost:4000"+path) + + _, err := kubectl.Output(kubectlBinary, kubeconfigPath, args...) + if err != nil && firstErr == nil { + firstErr = fmt.Errorf("pod %s: %w", pod, err) + } + } + + return firstErr +} + +// hotAddModels uses the LiteLLM /model/new API to add models to the running +// router without a restart. The ConfigMap is already patched by +// PatchLiteLLMProvider for persistence across restarts. +func hotAddModels(cfg *config.Config, u *ui.UI, entries []ModelEntry) error { + masterKey, err := GetMasterKey(cfg) + if err != nil { + return fmt.Errorf("get master key: %w", err) + } + + kubectlBinary := filepath.Join(cfg.BinDir, "kubectl") + kubeconfigPath := filepath.Join(cfg.ConfigDir, "kubeconfig.yaml") + + for _, entry := range entries { + body := map[string]any{ + "model_name": entry.ModelName, + "litellm_params": map[string]any{ + "model": entry.LiteLLMParams.Model, + "api_base": entry.LiteLLMParams.APIBase, + "api_key": entry.LiteLLMParams.APIKey, + }, + } + bodyJSON, err := json.Marshal(body) + if err != nil { + continue + } + + if err := litellmAPIViaExec(kubectlBinary, kubeconfigPath, masterKey, "/model/new", bodyJSON); err != nil { + u.Warnf("Hot-add %s failed: %v", entry.ModelName, err) + return fmt.Errorf("hot-add %s: %w", entry.ModelName, err) + } + } + + return nil +} + +// hotDeleteModel removes a model from the running LiteLLM router(s) via the +// /model/delete API. It first queries /model/info to resolve model IDs. +func hotDeleteModel(cfg *config.Config, u *ui.UI, modelName string) error { + masterKey, err := GetMasterKey(cfg) + if err != nil { + return fmt.Errorf("get master key: %w", err) + } + + kubectlBinary := filepath.Join(cfg.BinDir, "kubectl") + kubeconfigPath := filepath.Join(cfg.ConfigDir, "kubeconfig.yaml") + + // Query /model/info on one pod to get model IDs. + raw, err := kubectl.Output(kubectlBinary, kubeconfigPath, + "exec", "-n", namespace, "deployment/"+deployName, "-c", "litellm", + "--", "wget", "-qO-", + "--header=Authorization: Bearer "+masterKey, + "http://localhost:4000/model/info") + if err != nil { + return fmt.Errorf("query /model/info: %w", err) + } + + var infoResp struct { + Data []struct { + ModelName string `json:"model_name"` + ModelInfo struct { + ID string `json:"id"` + } `json:"model_info"` + } `json:"data"` + } + if err := json.Unmarshal([]byte(raw), &infoResp); err != nil { + return fmt.Errorf("parse /model/info: %w", err) + } + + deleted := 0 + for _, m := range infoResp.Data { + if m.ModelName != modelName || m.ModelInfo.ID == "" { + continue + } + + deleteBody, _ := json.Marshal(map[string]any{"id": m.ModelInfo.ID}) + if err := litellmAPIViaExec(kubectlBinary, kubeconfigPath, masterKey, "/model/delete", deleteBody); err != nil { + u.Warnf("Hot-delete model %s (id=%s) failed: %v", modelName, m.ModelInfo.ID, err) + } else { + deleted++ + } + } + + if deleted == 0 { + return fmt.Errorf("model %q not found in LiteLLM router", modelName) + } + + return nil +} + +// RemoveModel removes a model entry from the LiteLLM ConfigMap (persistence) +// and hot-deletes it from the running router via the API (immediate effect). +// No pod restart is required. func RemoveModel(cfg *config.Config, u *ui.UI, modelName string) error { kubectlBinary := filepath.Join(cfg.BinDir, "kubectl") kubeconfigPath := filepath.Join(cfg.ConfigDir, "kubeconfig.yaml") @@ -257,7 +407,7 @@ func RemoveModel(cfg *config.Config, u *ui.UI, modelName string) error { return errors.New("cluster not running. Run 'obol stack up' first") } - // Read current config + // 1. Patch ConfigMap for persistence (survives pod restarts). raw, err := kubectl.Output(kubectlBinary, kubeconfigPath, "get", "configmap", configMapName, "-n", namespace, "-o", "jsonpath={.data.config\\.yaml}") if err != nil { @@ -269,7 +419,6 @@ func RemoveModel(cfg *config.Config, u *ui.UI, modelName string) error { return fmt.Errorf("failed to parse config.yaml: %w", err) } - // Find and remove matching entries var kept []ModelEntry removed := 0 @@ -289,13 +438,11 @@ func RemoveModel(cfg *config.Config, u *ui.UI, modelName string) error { litellmConfig.ModelList = kept - // Marshal back to YAML updated, err := yaml.Marshal(&litellmConfig) if err != nil { return fmt.Errorf("failed to marshal config: %w", err) } - // Build ConfigMap patch escapedYAML, err := json.Marshal(string(updated)) if err != nil { return fmt.Errorf("failed to escape YAML: %w", err) @@ -311,20 +458,12 @@ func RemoveModel(cfg *config.Config, u *ui.UI, modelName string) error { return fmt.Errorf("failed to patch ConfigMap: %w", err) } - // Restart deployment - u.Info("Restarting LiteLLM") - - if err := kubectl.Run(kubectlBinary, kubeconfigPath, - "rollout", "restart", "deployment/"+deployName, "-n", namespace); err != nil { - return fmt.Errorf("failed to restart LiteLLM: %w", err) - } - - if err := kubectl.Run(kubectlBinary, kubeconfigPath, - "rollout", "status", "deployment/"+deployName, "-n", namespace, - "--timeout=90s"); err != nil { - u.Warnf("LiteLLM rollout not confirmed: %v", err) + // 2. Hot-delete from running router via API (immediate, no restart). + if err := hotDeleteModel(cfg, u, modelName); err != nil { + u.Warnf("Hot-remove from LiteLLM router failed: %v", err) + u.Dim(" The model is removed from config; it will disappear after next restart.") } else { - u.Successf("Model %q removed", modelName) + u.Successf("Model %q removed (live + config)", modelName) } return nil @@ -376,28 +515,20 @@ func AddCustomEndpoint(cfg *config.Config, u *ui.UI, name, endpoint, modelName, entry.LiteLLMParams.APIKey = "none" } - // Patch config + // Patch ConfigMap for persistence. u.Infof("Adding custom endpoint %q to LiteLLM config", name) if err := patchLiteLLMConfig(kubectlBinary, kubeconfigPath, []ModelEntry{entry}); err != nil { return fmt.Errorf("failed to update LiteLLM config: %w", err) } - // Restart - u.Info("Restarting LiteLLM") - - if err := kubectl.Run(kubectlBinary, kubeconfigPath, - "rollout", "restart", "deployment/"+deployName, "-n", namespace); err != nil { - return fmt.Errorf("failed to restart LiteLLM: %w", err) + // Hot-add via API (no restart needed). + if err := hotAddModels(cfg, u, []ModelEntry{entry}); err != nil { + u.Warnf("Hot-add failed, falling back to restart: %v", err) + return RestartLiteLLM(cfg, u, name) } - if err := kubectl.Run(kubectlBinary, kubeconfigPath, - "rollout", "status", "deployment/"+deployName, "-n", namespace, - "--timeout=90s"); err != nil { - u.Warnf("LiteLLM rollout not confirmed: %v", err) - } else { - u.Successf("Custom endpoint %q added (model: %s)", name, modelID) - } + u.Successf("Custom endpoint %q added (model: %s)", name, modelID) return nil } diff --git a/internal/monetizeapi/types.go b/internal/monetizeapi/types.go index 4c7e9602..02c454c3 100644 --- a/internal/monetizeapi/types.go +++ b/internal/monetizeapi/types.go @@ -13,9 +13,11 @@ const ( ServiceOfferKind = "ServiceOffer" RegistrationRequestKind = "RegistrationRequest" + PurchaseRequestKind = "PurchaseRequest" ServiceOfferResource = "serviceoffers" RegistrationRequestResource = "registrationrequests" + PurchaseRequestResource = "purchaserequests" PausedAnnotation = "obol.org/paused" ) @@ -23,6 +25,7 @@ const ( var ( ServiceOfferGVR = schema.GroupVersionResource{Group: Group, Version: Version, Resource: ServiceOfferResource} RegistrationRequestGVR = schema.GroupVersionResource{Group: Group, Version: Version, Resource: RegistrationRequestResource} + PurchaseRequestGVR = schema.GroupVersionResource{Group: Group, Version: Version, Resource: PurchaseRequestResource} ServiceGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"} SecretGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"} @@ -170,3 +173,75 @@ func (o *ServiceOffer) IsInference() bool { func (o *ServiceOffer) IsPaused() bool { return o.Annotations != nil && o.Annotations[PausedAnnotation] == "true" } + +// ── PurchaseRequest ───────────────────────────────────────────────────────── + +type PurchaseRequest struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + Spec PurchaseRequestSpec `json:"spec,omitempty"` + Status PurchaseRequestStatus `json:"status,omitempty"` +} + +type PurchaseRequestSpec struct { + Endpoint string `json:"endpoint"` + Model string `json:"model"` + Count int `json:"count"` + SignerNamespace string `json:"signerNamespace,omitempty"` + BuyerNamespace string `json:"buyerNamespace,omitempty"` + PreSignedAuths []PreSignedAuth `json:"preSignedAuths,omitempty"` + AutoRefill PurchaseAutoRefill `json:"autoRefill,omitempty"` + Payment PurchasePayment `json:"payment"` +} + +type PreSignedAuth struct { + Signature string `json:"signature"` + From string `json:"from"` + To string `json:"to"` + Value string `json:"value"` + ValidAfter string `json:"validAfter"` + ValidBefore string `json:"validBefore"` + Nonce string `json:"nonce"` +} + +type PurchaseAutoRefill struct { + Enabled bool `json:"enabled,omitempty"` + Threshold int `json:"threshold,omitempty"` + Count int `json:"count,omitempty"` + MaxTotal int `json:"maxTotal,omitempty"` + MaxSpendPerDay string `json:"maxSpendPerDay,omitempty"` +} + +type PurchasePayment struct { + Network string `json:"network"` + PayTo string `json:"payTo"` + Price string `json:"price"` + Asset string `json:"asset"` +} + +type PurchaseRequestStatus struct { + Conditions []Condition `json:"conditions,omitempty"` + PublicModel string `json:"publicModel,omitempty"` + Remaining int `json:"remaining,omitempty"` + Spent int `json:"spent,omitempty"` + TotalSigned int `json:"totalSigned,omitempty"` + TotalSpent string `json:"totalSpent,omitempty"` + ProbedAt string `json:"probedAt,omitempty"` + ProbedPrice string `json:"probedPrice,omitempty"` + WalletBalance string `json:"walletBalance,omitempty"` + SignerAddress string `json:"signerAddress,omitempty"` +} + +func (pr *PurchaseRequest) EffectiveSignerNamespace() string { + if pr.Spec.SignerNamespace != "" { + return pr.Spec.SignerNamespace + } + return pr.Namespace +} + +func (pr *PurchaseRequest) EffectiveBuyerNamespace() string { + if pr.Spec.BuyerNamespace != "" { + return pr.Spec.BuyerNamespace + } + return "llm" +} diff --git a/internal/serviceoffercontroller/controller.go b/internal/serviceoffercontroller/controller.go index aa3b55c5..cc4e2880 100644 --- a/internal/serviceoffercontroller/controller.go +++ b/internal/serviceoffercontroller/controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -48,6 +49,8 @@ const ( ) type Controller struct { + kubeClient kubernetes.Interface + dynClient dynamic.Interface client dynamic.Interface offers dynamic.NamespaceableResourceInterface registrationRequests dynamic.NamespaceableResourceInterface @@ -59,12 +62,17 @@ type Controller struct { offerInformer cache.SharedIndexInformer registrationInformer cache.SharedIndexInformer + purchaseInformer cache.SharedIndexInformer configMapInformer cache.SharedIndexInformer offerQueue workqueue.TypedRateLimitingInterface[string] registrationQueue workqueue.TypedRateLimitingInterface[string] + purchaseQueue workqueue.TypedRateLimitingInterface[string] catalogMu sync.Mutex - httpClient *http.Client + pendingAuths sync.Map // key: "ns/name" → []map[string]string + + httpClient *http.Client + litellmURLOverride string // test-only: override LiteLLM base URL registrationKey *ecdsa.PrivateKey registrationOwnerAddress string @@ -89,9 +97,15 @@ func New(cfg *rest.Config) (*Controller, error) { log.Printf("serviceoffer-controller: no ERC-8004 signing key configured; on-chain registration disabled") } + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("create kube client: %w", err) + } + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(client, 0, metav1.NamespaceAll, nil) offerInformer := factory.ForResource(monetizeapi.ServiceOfferGVR).Informer() registrationInformer := factory.ForResource(monetizeapi.RegistrationRequestGVR).Informer() + purchaseInformer := factory.ForResource(monetizeapi.PurchaseRequestGVR).Informer() configMapFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(client, 0, "obol-frontend", func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("metadata.name", "obol-stack-config").String() }) @@ -103,6 +117,8 @@ func New(cfg *rest.Config) (*Controller, error) { } controller := &Controller{ + kubeClient: kubeClient, + dynClient: client, client: client, offers: client.Resource(monetizeapi.ServiceOfferGVR), registrationRequests: client.Resource(monetizeapi.RegistrationRequestGVR), @@ -113,9 +129,11 @@ func New(cfg *rest.Config) (*Controller, error) { httpRoutes: client.Resource(monetizeapi.HTTPRouteGVR), offerInformer: offerInformer, registrationInformer: registrationInformer, + purchaseInformer: purchaseInformer, configMapInformer: configMapInformer, offerQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), registrationQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), + purchaseQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), httpClient: &http.Client{Timeout: 3 * time.Second}, registrationKey: registrationKey, registrationOwnerAddress: registrationOwnerAddress, @@ -139,6 +157,11 @@ func New(cfg *rest.Config) (*Controller, error) { UpdateFunc: func(_, newObj any) { controller.enqueueOfferFromRegistration(newObj) }, DeleteFunc: controller.enqueueOfferFromRegistration, }) + purchaseInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueuePurchase, + UpdateFunc: func(_, newObj any) { controller.enqueuePurchase(newObj) }, + DeleteFunc: controller.enqueuePurchase, + }) configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueDiscoveryRefresh, UpdateFunc: func(_, newObj any) { controller.enqueueDiscoveryRefresh(newObj) }, @@ -151,11 +174,13 @@ func New(cfg *rest.Config) (*Controller, error) { func (c *Controller) Run(ctx context.Context, workers int) error { defer c.offerQueue.ShutDown() defer c.registrationQueue.ShutDown() + defer c.purchaseQueue.ShutDown() go c.offerInformer.Run(ctx.Done()) go c.registrationInformer.Run(ctx.Done()) + go c.purchaseInformer.Run(ctx.Done()) go c.configMapInformer.Run(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), c.offerInformer.HasSynced, c.registrationInformer.HasSynced, c.configMapInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), c.offerInformer.HasSynced, c.registrationInformer.HasSynced, c.purchaseInformer.HasSynced, c.configMapInformer.HasSynced) { return fmt.Errorf("wait for informer sync") } @@ -171,6 +196,10 @@ func (c *Controller) Run(ctx context.Context, workers int) error { for c.processNextRegistration(ctx) { } }() + go func() { + for c.processNextPurchase(ctx) { + } + }() } <-ctx.Done() @@ -262,6 +291,32 @@ func (c *Controller) processNextRegistration(ctx context.Context) bool { return true } +func (c *Controller) enqueuePurchase(obj any) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + log.Printf("serviceoffer-controller: build purchase queue key: %v", err) + return + } + c.purchaseQueue.Add(key) +} + +func (c *Controller) processNextPurchase(ctx context.Context) bool { + key, shutdown := c.purchaseQueue.Get() + if shutdown { + return false + } + defer c.purchaseQueue.Done(key) + + if err := c.reconcilePurchase(ctx, key); err != nil { + log.Printf("serviceoffer-controller: reconcile purchase %s: %v", key, err) + c.purchaseQueue.AddRateLimited(key) + return true + } + + c.purchaseQueue.Forget(key) + return true +} + func (c *Controller) reconcileOffer(ctx context.Context, key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { diff --git a/internal/serviceoffercontroller/purchase.go b/internal/serviceoffercontroller/purchase.go new file mode 100644 index 00000000..67b17111 --- /dev/null +++ b/internal/serviceoffercontroller/purchase.go @@ -0,0 +1,306 @@ +package serviceoffercontroller + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strings" + "time" + + "github.com/ObolNetwork/obol-stack/internal/monetizeapi" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +const purchaseRequestFinalizer = "obol.org/purchase-finalizer" + +// ── PurchaseRequest reconciler ────────────────────────────────────────────── + +func (c *Controller) reconcilePurchase(ctx context.Context, key string) error { + ns, name, _ := strings.Cut(key, "/") + + raw, err := c.dynClient.Resource(monetizeapi.PurchaseRequestGVR).Namespace(ns).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil // deleted + } + + var pr monetizeapi.PurchaseRequest + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(raw.Object, &pr); err != nil { + return fmt.Errorf("unmarshal PurchaseRequest: %w", err) + } + + // Add finalizer if missing. + if !hasStringInSlice(raw.GetFinalizers(), purchaseRequestFinalizer) { + patched := raw.DeepCopy() + patched.SetFinalizers(append(patched.GetFinalizers(), purchaseRequestFinalizer)) + if _, err := c.dynClient.Resource(monetizeapi.PurchaseRequestGVR).Namespace(ns).Update(ctx, patched, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("add finalizer: %w", err) + } + } + + // Handle deletion. + if raw.GetDeletionTimestamp() != nil { + return c.reconcileDeletingPurchase(ctx, &pr, raw) + } + + status := pr.Status + status.Conditions = append([]monetizeapi.Condition{}, pr.Status.Conditions...) + + // Stage 1: Probe + if err := c.reconcilePurchaseProbe(ctx, &status, &pr); err != nil { + log.Printf("purchase %s/%s: probe failed: %v", ns, name, err) + } + + // Stage 2: Sign auths + if purchaseConditionIsTrue(status.Conditions, "Probed") { + if err := c.reconcilePurchaseSign(ctx, &status, &pr); err != nil { + log.Printf("purchase %s/%s: sign failed: %v", ns, name, err) + } + } + + // Stage 3: Configure sidecar + if purchaseConditionIsTrue(status.Conditions, "AuthsSigned") { + if err := c.reconcilePurchaseConfigure(ctx, &status, &pr); err != nil { + log.Printf("purchase %s/%s: configure failed: %v", ns, name, err) + } + } + + // Stage 4: Verify sidecar loaded + if purchaseConditionIsTrue(status.Conditions, "Configured") { + c.reconcilePurchaseReady(ctx, &status, &pr) + } + + return c.updatePurchaseStatus(ctx, raw, &status) +} + +func (c *Controller) reconcileDeletingPurchase(ctx context.Context, pr *monetizeapi.PurchaseRequest, raw *unstructured.Unstructured) error { + buyerNS := pr.EffectiveBuyerNamespace() + c.removeLiteLLMModelEntry(ctx, buyerNS, "paid/"+pr.Spec.Model) + c.removeBuyerUpstream(ctx, buyerNS, pr.Name) + + patched := raw.DeepCopy() + fins := patched.GetFinalizers() + filtered := fins[:0] + for _, f := range fins { + if f != purchaseRequestFinalizer { + filtered = append(filtered, f) + } + } + patched.SetFinalizers(filtered) + _, err := c.dynClient.Resource(monetizeapi.PurchaseRequestGVR).Namespace(pr.Namespace).Update(ctx, patched, metav1.UpdateOptions{}) + return err +} + +// ── Stage 1: Probe ────────────────────────────────────────────────────────── + +func (c *Controller) reconcilePurchaseProbe(ctx context.Context, status *monetizeapi.PurchaseRequestStatus, pr *monetizeapi.PurchaseRequest) error { + client := &http.Client{Timeout: 15 * time.Second} + + body := `{"model":"probe","messages":[{"role":"user","content":"probe"}],"max_tokens":1}` + req, err := http.NewRequestWithContext(ctx, "POST", pr.Spec.Endpoint, strings.NewReader(body)) + if err != nil { + setPurchaseCondition(&status.Conditions, "Probed", "False", "ProbeError", err.Error()) + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + setPurchaseCondition(&status.Conditions, "Probed", "False", "ProbeError", err.Error()) + return err + } + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + + if resp.StatusCode != http.StatusPaymentRequired { + setPurchaseCondition(&status.Conditions, "Probed", "False", "NotPaymentGated", + fmt.Sprintf("expected 402, got %d", resp.StatusCode)) + return fmt.Errorf("expected 402, got %d", resp.StatusCode) + } + + var pricing struct { + Accepts []struct { + PayTo string `json:"payTo"` + MaxAmountRequired string `json:"maxAmountRequired"` + Network string `json:"network"` + } `json:"accepts"` + } + if err := json.Unmarshal(respBody, &pricing); err != nil || len(pricing.Accepts) == 0 { + setPurchaseCondition(&status.Conditions, "Probed", "False", "InvalidPricing", "402 body missing accepts") + return fmt.Errorf("invalid 402 response") + } + + accept := pricing.Accepts[0] + if accept.MaxAmountRequired != pr.Spec.Payment.Price { + setPurchaseCondition(&status.Conditions, "Probed", "False", "PricingMismatch", + fmt.Sprintf("spec.price=%s but endpoint wants %s", pr.Spec.Payment.Price, accept.MaxAmountRequired)) + return fmt.Errorf("pricing mismatch") + } + + status.ProbedAt = time.Now().UTC().Format(time.RFC3339) + status.ProbedPrice = accept.MaxAmountRequired + setPurchaseCondition(&status.Conditions, "Probed", "True", "Validated", + fmt.Sprintf("402: %s micro-USDC on %s", accept.MaxAmountRequired, accept.Network)) + return nil +} + +// ── Stage 2: Read pre-signed auths from spec ──────────────────────────────── +// +// buy.py signs the auths locally (it has remote-signer access in the same +// namespace) and embeds them in spec.preSignedAuths. The controller reads +// them directly from the CR — no cross-namespace Secret read needed. + +func (c *Controller) reconcilePurchaseSign(ctx context.Context, status *monetizeapi.PurchaseRequestStatus, pr *monetizeapi.PurchaseRequest) error { + if len(pr.Spec.PreSignedAuths) == 0 { + setPurchaseCondition(&status.Conditions, "AuthsSigned", "False", "NoAuths", + "spec.preSignedAuths is empty — buy.py should embed auths in the CR") + return fmt.Errorf("no pre-signed auths in spec") + } + + // Convert typed auths to map format for the buyer ConfigMap. + auths := make([]map[string]string, len(pr.Spec.PreSignedAuths)) + for i, a := range pr.Spec.PreSignedAuths { + auths[i] = map[string]string{ + "signature": a.Signature, + "from": a.From, + "to": a.To, + "value": a.Value, + "validAfter": a.ValidAfter, + "validBefore": a.ValidBefore, + "nonce": a.Nonce, + } + } + + if pr.Spec.PreSignedAuths[0].From != "" { + status.SignerAddress = pr.Spec.PreSignedAuths[0].From + } + + c.pendingAuths.Store(pr.Namespace+"/"+pr.Name, auths) + status.TotalSigned += len(auths) + setPurchaseCondition(&status.Conditions, "AuthsSigned", "True", "Loaded", + fmt.Sprintf("Loaded %d pre-signed auths from spec", len(auths))) + return nil +} + +// ── Stage 3: Configure sidecar ────────────────────────────────────────────── + +func (c *Controller) reconcilePurchaseConfigure(ctx context.Context, status *monetizeapi.PurchaseRequestStatus, pr *monetizeapi.PurchaseRequest) error { + key := pr.Namespace + "/" + pr.Name + authsRaw, ok := c.pendingAuths.Load(key) + if !ok { + setPurchaseCondition(&status.Conditions, "Configured", "False", "NoAuths", "No pending auths to write") + return fmt.Errorf("no pending auths") + } + auths := authsRaw.([]map[string]string) + c.pendingAuths.Delete(key) + + buyerNS := pr.EffectiveBuyerNamespace() + + upstream := map[string]any{ + "url": pr.Spec.Endpoint, + "network": pr.Spec.Payment.Network, + "payTo": pr.Spec.Payment.PayTo, + "price": pr.Spec.Payment.Price, + "asset": pr.Spec.Payment.Asset, + "remoteModel": pr.Spec.Model, + } + + if err := c.mergeBuyerConfig(ctx, buyerNS, pr.Name, upstream); err != nil { + setPurchaseCondition(&status.Conditions, "Configured", "False", "ConfigWriteError", err.Error()) + return err + } + + if err := c.mergeBuyerAuths(ctx, buyerNS, pr.Name, auths); err != nil { + setPurchaseCondition(&status.Conditions, "Configured", "False", "AuthsWriteError", err.Error()) + return err + } + + // Trigger immediate sidecar reload so it picks up the new config/auths + // without waiting for the 5-second ticker. + c.triggerBuyerReload(ctx, buyerNS) + + // Hot-add via /model/new API — no pod restart needed. + paidModel := "paid/" + pr.Spec.Model + c.addLiteLLMModelEntry(ctx, buyerNS, paidModel) + + status.Remaining = len(auths) + status.PublicModel = paidModel + setPurchaseCondition(&status.Conditions, "Configured", "True", "Written", + fmt.Sprintf("Wrote %d auths to %s/x402-buyer-auths", len(auths), buyerNS)) + return nil +} + +// ── Stage 4: Ready ────────────────────────────────────────────────────────── + +func (c *Controller) reconcilePurchaseReady(ctx context.Context, status *monetizeapi.PurchaseRequestStatus, pr *monetizeapi.PurchaseRequest) { + buyerNS := pr.EffectiveBuyerNamespace() + + remaining, spent, err := c.checkBuyerStatus(ctx, buyerNS, pr.Name) + if err != nil { + setPurchaseCondition(&status.Conditions, "Ready", "False", "SidecarNotReady", err.Error()) + return + } + + status.Remaining = remaining + status.Spent = spent + setPurchaseCondition(&status.Conditions, "Ready", "True", "Reconciled", + fmt.Sprintf("Sidecar: %d remaining, %d spent", remaining, spent)) +} + +// ── Status helpers ────────────────────────────────────────────────────────── + +func (c *Controller) updatePurchaseStatus(ctx context.Context, raw *unstructured.Unstructured, status *monetizeapi.PurchaseRequestStatus) error { + statusMap, _ := json.Marshal(status) + var statusObj map[string]any + json.Unmarshal(statusMap, &statusObj) + + raw.Object["status"] = statusObj + _, err := c.dynClient.Resource(monetizeapi.PurchaseRequestGVR). + Namespace(raw.GetNamespace()). + UpdateStatus(ctx, raw, metav1.UpdateOptions{}) + return err +} + +func hasStringInSlice(slice []string, target string) bool { + for _, s := range slice { + if s == target { + return true + } + } + return false +} + +func purchaseConditionIsTrue(conditions []monetizeapi.Condition, condType string) bool { + for _, c := range conditions { + if c.Type == condType { + return c.Status == "True" + } + } + return false +} + +func setPurchaseCondition(conditions *[]monetizeapi.Condition, condType, status, reason, message string) { + now := metav1.Now() + for i, c := range *conditions { + if c.Type == condType { + if c.Status != status { + (*conditions)[i].LastTransitionTime = now + } + (*conditions)[i].Status = status + (*conditions)[i].Reason = reason + (*conditions)[i].Message = message + return + } + } + *conditions = append(*conditions, monetizeapi.Condition{ + Type: condType, + Status: status, + Reason: reason, + Message: message, + LastTransitionTime: now, + }) +} diff --git a/internal/serviceoffercontroller/purchase_helpers.go b/internal/serviceoffercontroller/purchase_helpers.go new file mode 100644 index 00000000..f6336f10 --- /dev/null +++ b/internal/serviceoffercontroller/purchase_helpers.go @@ -0,0 +1,475 @@ +package serviceoffercontroller + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "strconv" + "strings" + "time" + + "github.com/ObolNetwork/obol-stack/internal/monetizeapi" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + buyerConfigCM = "x402-buyer-config" + buyerAuthsCM = "x402-buyer-auths" +) + +// ── ConfigMap merge (optimistic concurrency) ──────────────────────────────── + +func (c *Controller) mergeBuyerConfig(ctx context.Context, ns, name string, upstream map[string]any) error { + cm, err := c.kubeClient.CoreV1().ConfigMaps(ns).Get(ctx, buyerConfigCM, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("get %s/%s: %w", ns, buyerConfigCM, err) + } + + // Parse existing config. + var config struct { + Upstreams map[string]any `json:"upstreams"` + } + if raw, ok := cm.Data["config.json"]; ok { + json.Unmarshal([]byte(raw), &config) + } + if config.Upstreams == nil { + config.Upstreams = make(map[string]any) + } + + // Merge the new upstream. + config.Upstreams[name] = upstream + + configJSON, _ := json.MarshalIndent(config, "", " ") + cm.Data["config.json"] = string(configJSON) + + _, err = c.kubeClient.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{}) + return err +} + +func (c *Controller) mergeBuyerAuths(ctx context.Context, ns, name string, auths []map[string]string) error { + cm, err := c.kubeClient.CoreV1().ConfigMaps(ns).Get(ctx, buyerAuthsCM, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("get %s/%s: %w", ns, buyerAuthsCM, err) + } + + // Parse existing auths. + var allAuths map[string]any + if raw, ok := cm.Data["auths.json"]; ok { + json.Unmarshal([]byte(raw), &allAuths) + } + if allAuths == nil { + allAuths = make(map[string]any) + } + + // Set auths for this upstream (replace, not append). + allAuths[name] = auths + + authsJSON, _ := json.MarshalIndent(allAuths, "", " ") + cm.Data["auths.json"] = string(authsJSON) + + _, err = c.kubeClient.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{}) + return err +} + +func (c *Controller) removeBuyerUpstream(ctx context.Context, ns, name string) { + // Remove from config. + cm, err := c.kubeClient.CoreV1().ConfigMaps(ns).Get(ctx, buyerConfigCM, metav1.GetOptions{}) + if err == nil { + var config struct { + Upstreams map[string]any `json:"upstreams"` + } + if raw, ok := cm.Data["config.json"]; ok { + json.Unmarshal([]byte(raw), &config) + } + if config.Upstreams != nil { + delete(config.Upstreams, name) + configJSON, _ := json.MarshalIndent(config, "", " ") + cm.Data["config.json"] = string(configJSON) + c.kubeClient.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{}) + } + } + + // Remove from auths. + authsCM, err := c.kubeClient.CoreV1().ConfigMaps(ns).Get(ctx, buyerAuthsCM, metav1.GetOptions{}) + if err == nil { + var allAuths map[string]any + if raw, ok := authsCM.Data["auths.json"]; ok { + json.Unmarshal([]byte(raw), &allAuths) + } + if allAuths != nil { + delete(allAuths, name) + authsJSON, _ := json.MarshalIndent(allAuths, "", " ") + authsCM.Data["auths.json"] = string(authsJSON) + c.kubeClient.CoreV1().ConfigMaps(ns).Update(ctx, authsCM, metav1.UpdateOptions{}) + } + } +} + +// getLiteLLMMasterKey reads the LITELLM_MASTER_KEY from the litellm-secrets +// Secret in the given namespace. +func (c *Controller) getLiteLLMMasterKey(ctx context.Context, ns string) (string, error) { + secret, err := c.kubeClient.CoreV1().Secrets(ns).Get(ctx, "litellm-secrets", metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("get litellm-secrets: %w", err) + } + key, ok := secret.Data["LITELLM_MASTER_KEY"] + if !ok { + return "", fmt.Errorf("LITELLM_MASTER_KEY not found in litellm-secrets") + } + return string(key), nil +} + +// litellmBaseURL returns the in-cluster base URL for the LiteLLM service in +// the given namespace. The controller field litellmURLOverride, when set, +// takes precedence (used in tests). +func (c *Controller) litellmBaseURL(ns string) string { + if c.litellmURLOverride != "" { + return c.litellmURLOverride + } + return fmt.Sprintf("http://litellm.%s.svc.cluster.local:4000", ns) +} + +// addLiteLLMModelEntry adds a model entry to the running LiteLLM router via +// the /model/new HTTP API. This avoids the fragile read-modify-write cycle +// on the ConfigMap and does not require a pod restart. +func (c *Controller) addLiteLLMModelEntry(ctx context.Context, ns, modelName string) { + masterKey, err := c.getLiteLLMMasterKey(ctx, ns) + if err != nil { + log.Printf("purchase: failed to get LiteLLM master key: %v", err) + return + } + + body := map[string]any{ + "model_name": modelName, + "litellm_params": map[string]any{ + "model": "openai/" + modelName, + "api_base": "http://127.0.0.1:8402", + "api_key": "unused", + }, + } + bodyJSON, err := json.Marshal(body) + if err != nil { + log.Printf("purchase: failed to marshal model request: %v", err) + return + } + + url := c.litellmBaseURL(ns) + "/model/new" + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(reqCtx, "POST", url, bytes.NewReader(bodyJSON)) + if err != nil { + log.Printf("purchase: failed to create model request: %v", err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+masterKey) + + resp, err := c.httpClient.Do(req) + if err != nil { + log.Printf("purchase: LiteLLM /model/new failed for %s: %v", modelName, err) + return + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + respBody, _ := io.ReadAll(resp.Body) + log.Printf("purchase: LiteLLM /model/new returned %d for %s: %s", + resp.StatusCode, modelName, strings.TrimSpace(string(respBody))) + return + } + + log.Printf("purchase: added LiteLLM model %s via API", modelName) +} + +// removeLiteLLMModelEntry removes a model entry from the running LiteLLM +// router via the /model/delete HTTP API. It queries /model/info to resolve +// the internal model_id, then deletes by ID. Best-effort: logs errors but +// does not fail the reconcile. +func (c *Controller) removeLiteLLMModelEntry(ctx context.Context, ns, modelName string) { + masterKey, err := c.getLiteLLMMasterKey(ctx, ns) + if err != nil { + log.Printf("purchase: remove model: failed to get master key: %v", err) + return + } + + infoURL := c.litellmBaseURL(ns) + "/model/info" + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(reqCtx, "GET", infoURL, nil) + if err != nil { + log.Printf("purchase: remove model: request error: %v", err) + return + } + req.Header.Set("Authorization", "Bearer "+masterKey) + + resp, err := c.httpClient.Do(req) + if err != nil { + log.Printf("purchase: remove model: /model/info failed: %v", err) + return + } + defer resp.Body.Close() + + var infoResp struct { + Data []struct { + ModelName string `json:"model_name"` + ModelInfo struct { + ID string `json:"id"` + } `json:"model_info"` + } `json:"data"` + } + if err := json.NewDecoder(resp.Body).Decode(&infoResp); err != nil { + log.Printf("purchase: remove model: parse /model/info: %v", err) + return + } + + for _, m := range infoResp.Data { + if m.ModelName != modelName { + continue + } + c.deleteLiteLLMModel(ctx, ns, masterKey, m.ModelInfo.ID, modelName) + } +} + +func (c *Controller) deleteLiteLLMModel(ctx context.Context, ns, masterKey, modelID, modelName string) { + body := map[string]any{"id": modelID} + bodyJSON, _ := json.Marshal(body) + + url := c.litellmBaseURL(ns) + "/model/delete" + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(reqCtx, "POST", url, bytes.NewReader(bodyJSON)) + if err != nil { + log.Printf("purchase: delete model request error: %v", err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+masterKey) + + resp, err := c.httpClient.Do(req) + if err != nil { + log.Printf("purchase: /model/delete failed for %s: %v", modelName, err) + return + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + respBody, _ := io.ReadAll(resp.Body) + log.Printf("purchase: /model/delete returned %d for %s: %s", + resp.StatusCode, modelName, strings.TrimSpace(string(respBody))) + return + } + + log.Printf("purchase: removed LiteLLM model %s (id=%s) via API", modelName, modelID) +} + +// triggerBuyerReload sends POST /admin/reload to the x402-buyer sidecar +// on all running litellm pods. Best-effort — the sidecar reloads on its +// own 5-second timer anyway. +func (c *Controller) triggerBuyerReload(ctx context.Context, ns string) { + pods, err := c.kubeClient.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{ + LabelSelector: "app=litellm", + }) + if err != nil || len(pods.Items) == 0 { + return + } + + for _, pod := range pods.Items { + if pod.Status.Phase != "Running" || pod.Status.PodIP == "" { + continue + } + reloadURL := fmt.Sprintf("http://%s:8402/admin/reload", pod.Status.PodIP) + reqCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + req, _ := http.NewRequestWithContext(reqCtx, "POST", reloadURL, nil) + c.httpClient.Do(req) //nolint:bodyclose // best-effort, response ignored + cancel() + } +} + +// ── Sidecar status check ──────────────────────────────────────────────────── + +func (c *Controller) checkBuyerStatus(ctx context.Context, ns, name string) (remaining, spent int, err error) { + // List LiteLLM pods to get a pod IP. + pods, err := c.kubeClient.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{ + LabelSelector: "app=litellm", + }) + if err != nil || len(pods.Items) == 0 { + return 0, 0, fmt.Errorf("no litellm pods in %s", ns) + } + + for _, pod := range pods.Items { + if pod.Status.Phase != "Running" || pod.Status.PodIP == "" { + continue + } + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get(fmt.Sprintf("http://%s:8402/status", pod.Status.PodIP)) + if err != nil { + continue + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + + var status map[string]struct { + Remaining int `json:"remaining"` + Spent int `json:"spent"` + } + if err := json.Unmarshal(body, &status); err != nil { + continue + } + + if info, ok := status[name]; ok { + return info.Remaining, info.Spent, nil + } + } + + return 0, 0, fmt.Errorf("upstream %q not found in sidecar status", name) +} + +// ── ERC-3009 typed data builder ───────────────────────────────────────────── + +func (c *Controller) getSignerAddress(ctx context.Context, signerURL string) (string, error) { + client := &http.Client{Timeout: 5 * time.Second} + req, err := http.NewRequestWithContext(ctx, "GET", signerURL+"/api/v1/keys", nil) + if err != nil { + return "", err + } + resp, err := client.Do(req) + if err != nil { + return "", fmt.Errorf("remote-signer unreachable: %w", err) + } + defer resp.Body.Close() + + // The remote-signer returns keys as a string array: {"keys": ["0x..."]} + var result struct { + Keys []string `json:"keys"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil || len(result.Keys) == 0 { + return "", fmt.Errorf("no signing keys in remote-signer") + } + return result.Keys[0], nil +} + +func (c *Controller) signAuths(ctx context.Context, signerURL, fromAddr string, pr *monetizeapi.PurchaseRequest) ([]map[string]string, error) { + client := &http.Client{Timeout: 30 * time.Second} + auths := make([]map[string]string, 0, pr.Spec.Count) + chainID := chainIDFromNetwork(pr.Spec.Payment.Network) + + for i := 0; i < pr.Spec.Count; i++ { + nonce := randomNonce() + validBefore := "4294967295" + + typedData := buildERC3009TypedData( + fromAddr, pr.Spec.Payment.PayTo, pr.Spec.Payment.Price, + validBefore, nonce, chainID, pr.Spec.Payment.Asset, + ) + + body, _ := json.Marshal(map[string]any{"typed_data": typedData}) + req, err := http.NewRequestWithContext(ctx, "POST", + fmt.Sprintf("%s/api/v1/sign/%s/typed-data", signerURL, fromAddr), + io.NopCloser(strings.NewReader(string(body)))) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("sign auth %d: %w", i+1, err) + } + + var signResult struct { + Signature string `json:"signature"` + } + json.NewDecoder(resp.Body).Decode(&signResult) + resp.Body.Close() + + if signResult.Signature == "" { + return nil, fmt.Errorf("sign auth %d: empty signature", i+1) + } + + auths = append(auths, map[string]string{ + "signature": signResult.Signature, + "from": fromAddr, + "to": pr.Spec.Payment.PayTo, + "value": pr.Spec.Payment.Price, + "validAfter": "0", + "validBefore": validBefore, + "nonce": nonce, + }) + } + return auths, nil +} + +func buildERC3009TypedData(from, to, value, validBefore, nonce string, chainID int, usdcAddr string) map[string]any { + return map[string]any{ + "types": map[string]any{ + "EIP712Domain": []map[string]string{ + {"name": "name", "type": "string"}, + {"name": "version", "type": "string"}, + {"name": "chainId", "type": "uint256"}, + {"name": "verifyingContract", "type": "address"}, + }, + "TransferWithAuthorization": []map[string]string{ + {"name": "from", "type": "address"}, + {"name": "to", "type": "address"}, + {"name": "value", "type": "uint256"}, + {"name": "validAfter", "type": "uint256"}, + {"name": "validBefore", "type": "uint256"}, + {"name": "nonce", "type": "bytes32"}, + }, + }, + "primaryType": "TransferWithAuthorization", + "domain": map[string]any{ + "name": "USDC", + "version": "2", + "chainId": strconv.Itoa(chainID), + "verifyingContract": usdcAddr, + }, + "message": map[string]any{ + "from": from, + "to": to, + "value": value, + "validAfter": "0", + "validBefore": validBefore, + "nonce": nonce, + }, + } +} + +func randomNonce() string { + b := make([]byte, 32) + rand.Read(b) + return "0x" + hex.EncodeToString(b) +} + +func chainIDFromNetwork(network string) int { + switch network { + case "base-sepolia": + return 84532 + case "base": + return 8453 + case "mainnet", "ethereum": + return 1 + default: + return 84532 + } +} + +// ── Condition helpers ─────────────────────────────────────────────────────── + +func conditionIsTrue(conditions []monetizeapi.Condition, condType string) bool { + for _, c := range conditions { + if c.Type == condType { + return c.Status == "True" + } + } + return false +} diff --git a/internal/serviceoffercontroller/purchase_helpers_test.go b/internal/serviceoffercontroller/purchase_helpers_test.go new file mode 100644 index 00000000..89a32328 --- /dev/null +++ b/internal/serviceoffercontroller/purchase_helpers_test.go @@ -0,0 +1,302 @@ +package serviceoffercontroller + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func newTestControllerWithSecret(ns, masterKey string) *Controller { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "litellm-secrets", + Namespace: ns, + }, + Data: map[string][]byte{ + "LITELLM_MASTER_KEY": []byte(masterKey), + }, + } + kubeClient := fake.NewSimpleClientset(secret) + return &Controller{ + kubeClient: kubeClient, + httpClient: &http.Client{}, + } +} + +func TestGetLiteLLMMasterKey(t *testing.T) { + c := newTestControllerWithSecret("llm", "sk-obol-test-key") + + key, err := c.getLiteLLMMasterKey(context.Background(), "llm") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if key != "sk-obol-test-key" { + t.Fatalf("key = %q, want %q", key, "sk-obol-test-key") + } +} + +func TestGetLiteLLMMasterKeyMissingSecret(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + c := &Controller{kubeClient: kubeClient} + + _, err := c.getLiteLLMMasterKey(context.Background(), "llm") + if err == nil { + t.Fatal("expected error for missing secret, got nil") + } +} + +func TestGetLiteLLMMasterKeyMissingKey(t *testing.T) { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "litellm-secrets", + Namespace: "llm", + }, + Data: map[string][]byte{ + "OTHER_KEY": []byte("value"), + }, + } + kubeClient := fake.NewSimpleClientset(secret) + c := &Controller{kubeClient: kubeClient} + + _, err := c.getLiteLLMMasterKey(context.Background(), "llm") + if err == nil { + t.Fatal("expected error for missing LITELLM_MASTER_KEY, got nil") + } +} + +func TestLiteLLMBaseURL(t *testing.T) { + c := &Controller{} + + url := c.litellmBaseURL("llm") + if url != "http://litellm.llm.svc.cluster.local:4000" { + t.Fatalf("url = %q, want %q", url, "http://litellm.llm.svc.cluster.local:4000") + } + + c.litellmURLOverride = "http://localhost:9999" + url = c.litellmBaseURL("llm") + if url != "http://localhost:9999" { + t.Fatalf("url = %q, want %q", url, "http://localhost:9999") + } +} + +func TestAddLiteLLMModelEntryViaAPI(t *testing.T) { + var ( + gotAuth string + gotBody map[string]any + callCount atomic.Int32 + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/model/new" { + w.WriteHeader(http.StatusNotFound) + return + } + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + callCount.Add(1) + gotAuth = r.Header.Get("Authorization") + + json.NewDecoder(r.Body).Decode(&gotBody) + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]any{ + "model_id": "test-uuid-123", + "model_name": gotBody["model_name"], + }) + })) + defer server.Close() + + c := newTestControllerWithSecret("llm", "sk-obol-test-key") + c.litellmURLOverride = server.URL + + c.addLiteLLMModelEntry(context.Background(), "llm", "paid/qwen3.5:9b") + + if callCount.Load() != 1 { + t.Fatalf("expected 1 call to /model/new, got %d", callCount.Load()) + } + + if gotAuth != "Bearer sk-obol-test-key" { + t.Fatalf("Authorization = %q, want %q", gotAuth, "Bearer sk-obol-test-key") + } + + if gotBody["model_name"] != "paid/qwen3.5:9b" { + t.Fatalf("model_name = %v, want %q", gotBody["model_name"], "paid/qwen3.5:9b") + } + + params, ok := gotBody["litellm_params"].(map[string]any) + if !ok { + t.Fatal("litellm_params missing or wrong type") + } + if params["model"] != "openai/paid/qwen3.5:9b" { + t.Fatalf("litellm_params.model = %v, want %q", params["model"], "openai/paid/qwen3.5:9b") + } + if params["api_base"] != "http://127.0.0.1:8402" { + t.Fatalf("litellm_params.api_base = %v, want %q", params["api_base"], "http://127.0.0.1:8402") + } + if params["api_key"] != "unused" { + t.Fatalf("litellm_params.api_key = %v, want %q", params["api_key"], "unused") + } +} + +func TestAddLiteLLMModelEntryHandlesServerError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(`{"error": "internal server error"}`)) + })) + defer server.Close() + + c := newTestControllerWithSecret("llm", "sk-obol-test-key") + c.litellmURLOverride = server.URL + + // Should not panic; best-effort, logs the error. + c.addLiteLLMModelEntry(context.Background(), "llm", "paid/test-model") +} + +func TestAddLiteLLMModelEntryHandlesMissingSecret(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + c := &Controller{ + kubeClient: kubeClient, + httpClient: &http.Client{}, + litellmURLOverride: "http://localhost:1234", + } + + // Should not panic; the function logs and returns on missing secret. + c.addLiteLLMModelEntry(context.Background(), "llm", "paid/test-model") +} + +// ── removeLiteLLMModelEntry tests ────────────────────────────────────────── + +func TestRemoveLiteLLMModelEntry(t *testing.T) { + var ( + infoRequested atomic.Bool + deleteRequested atomic.Bool + deletedID string + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/model/info": + infoRequested.Store(true) + json.NewEncoder(w).Encode(map[string]any{ + "data": []map[string]any{ + { + "model_name": "paid/qwen3.5:9b", + "model_info": map[string]any{"id": "model-uuid-abc"}, + }, + { + "model_name": "other-model", + "model_info": map[string]any{"id": "model-uuid-xyz"}, + }, + }, + }) + case "/model/delete": + deleteRequested.Store(true) + var body map[string]any + json.NewDecoder(r.Body).Decode(&body) + deletedID = body["id"].(string) + w.WriteHeader(http.StatusOK) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + c := newTestControllerWithSecret("llm", "sk-obol-test-key") + c.litellmURLOverride = server.URL + + c.removeLiteLLMModelEntry(context.Background(), "llm", "paid/qwen3.5:9b") + + if !infoRequested.Load() { + t.Fatal("expected GET /model/info to be called") + } + if !deleteRequested.Load() { + t.Fatal("expected POST /model/delete to be called") + } + if deletedID != "model-uuid-abc" { + t.Fatalf("deleted ID = %q, want model-uuid-abc", deletedID) + } +} + +func TestRemoveLiteLLMModelEntryNoMatch(t *testing.T) { + var deleteRequested atomic.Bool + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/model/info": + json.NewEncoder(w).Encode(map[string]any{ + "data": []map[string]any{ + { + "model_name": "other-model", + "model_info": map[string]any{"id": "model-uuid-xyz"}, + }, + }, + }) + case "/model/delete": + deleteRequested.Store(true) + w.WriteHeader(http.StatusOK) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + c := newTestControllerWithSecret("llm", "sk-obol-test-key") + c.litellmURLOverride = server.URL + + c.removeLiteLLMModelEntry(context.Background(), "llm", "paid/nonexistent") + + if deleteRequested.Load() { + t.Fatal("expected /model/delete NOT to be called when model doesn't match") + } +} + +func TestRemoveLiteLLMModelEntryServerError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(`{"error":"internal server error"}`)) + })) + defer server.Close() + + c := newTestControllerWithSecret("llm", "sk-obol-test-key") + c.litellmURLOverride = server.URL + + // Should not panic; best-effort, logs the error. + c.removeLiteLLMModelEntry(context.Background(), "llm", "paid/test-model") +} + +// ── triggerBuyerReload tests ─────────────────────────────────────────────── + +func TestTriggerBuyerReload(t *testing.T) { + var reloadCount atomic.Int32 + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/admin/reload" && r.Method == "POST" { + reloadCount.Add(1) + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusNotFound) + })) + defer server.Close() + + // The triggerBuyerReload hits pod IPs directly, not the LiteLLM service. + // We can't easily test the pod discovery with fake client, but we can + // verify the function doesn't panic with no pods. + kubeClient := fake.NewSimpleClientset() + c := &Controller{ + kubeClient: kubeClient, + httpClient: &http.Client{}, + } + + // Should not panic with no pods. + c.triggerBuyerReload(context.Background(), "llm") +} diff --git a/internal/x402/buyer/config.go b/internal/x402/buyer/config.go index f807ef59..f80135e5 100644 --- a/internal/x402/buyer/config.go +++ b/internal/x402/buyer/config.go @@ -12,6 +12,8 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" + "strings" ) // Config is the top-level sidecar configuration, loaded from a JSON file @@ -97,3 +99,67 @@ func LoadAuths(path string) (AuthsFile, error) { return auths, nil } + +// LoadConfigDir reads per-upstream config files from a directory. Each *.json +// file is one upstream, keyed by the filename stem (e.g. "42.json" → key "42"). +// This is the SSA-compatible format where the controller applies one key per +// PurchaseRequest via Server-Side Apply. +func LoadConfigDir(dir string) (*Config, error) { + entries, err := os.ReadDir(dir) + if err != nil { + return nil, fmt.Errorf("read config dir %s: %w", dir, err) + } + + cfg := &Config{Upstreams: make(map[string]UpstreamConfig)} + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { + continue + } + + name := strings.TrimSuffix(e.Name(), ".json") + data, err := os.ReadFile(filepath.Join(dir, e.Name())) + if err != nil { + continue + } + + var upstream UpstreamConfig + if err := json.Unmarshal(data, &upstream); err != nil { + continue + } + + cfg.Upstreams[name] = upstream + } + + return cfg, nil +} + +// LoadAuthsDir reads per-upstream auth files from a directory. Each *.json +// file contains an array of PreSignedAuth for one upstream. +func LoadAuthsDir(dir string) (AuthsFile, error) { + entries, err := os.ReadDir(dir) + if err != nil { + return nil, fmt.Errorf("read auths dir %s: %w", dir, err) + } + + auths := make(AuthsFile) + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { + continue + } + + name := strings.TrimSuffix(e.Name(), ".json") + data, err := os.ReadFile(filepath.Join(dir, e.Name())) + if err != nil { + continue + } + + var pool []*PreSignedAuth + if err := json.Unmarshal(data, &pool); err != nil { + continue + } + + auths[name] = pool + } + + return auths, nil +} diff --git a/internal/x402/buyer/proxy.go b/internal/x402/buyer/proxy.go index 5aa0aaa8..3331a422 100644 --- a/internal/x402/buyer/proxy.go +++ b/internal/x402/buyer/proxy.go @@ -32,6 +32,7 @@ type Proxy struct { mux *http.ServeMux metrics *metrics state *StateStore + reloadCh chan struct{} // signals an immediate config reload } type upstreamEntry struct { @@ -67,6 +68,7 @@ func NewProxy(cfg *Config, auths AuthsFile, state *StateStore) (*Proxy, error) { mux: http.NewServeMux(), metrics: newMetrics(), state: state, + reloadCh: make(chan struct{}, 1), } p.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, r *http.Request) { @@ -74,6 +76,7 @@ func NewProxy(cfg *Config, auths AuthsFile, state *StateStore) (*Proxy, error) { fmt.Fprint(w, "ok") }) p.mux.HandleFunc("GET /status", p.handleStatus) + p.mux.HandleFunc("POST /admin/reload", p.handleAdminReload) p.mux.Handle("GET /metrics", p.metrics.handler()) registerOpenAIRoutes(p.mux, p.handleModelRequest) @@ -161,6 +164,7 @@ func (p *Proxy) syncCompatibilityRoutesLocked() { fmt.Fprint(w, "ok") }) p.mux.HandleFunc("GET /status", p.handleStatus) + p.mux.HandleFunc("POST /admin/reload", p.handleAdminReload) p.mux.Handle("GET /metrics", p.metrics.handler()) registerOpenAIRoutes(p.mux, p.handleModelRequest) @@ -588,6 +592,25 @@ func (p *Proxy) handleStatus(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(result) //nolint:errchkjson // controlled status map } +// handleAdminReload triggers an immediate config/auth reload from disk. +func (p *Proxy) handleAdminReload(w http.ResponseWriter, _ *http.Request) { + select { + case p.reloadCh <- struct{}{}: + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"status":"reload triggered"}`) + default: + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"status":"reload already pending"}`) + } +} + +// ReloadCh returns a channel that signals when an immediate reload is requested +// via the /admin/reload endpoint. The main goroutine should select on this +// alongside the periodic ticker. +func (p *Proxy) ReloadCh() <-chan struct{} { + return p.reloadCh +} + // singleJoiningSlash joins a base and suffix path with exactly one slash. func singleJoiningSlash(a, b string) string { aslash := strings.HasSuffix(a, "/") diff --git a/internal/x402/buyer/proxy_test.go b/internal/x402/buyer/proxy_test.go index 62a7348c..0838a6dc 100644 --- a/internal/x402/buyer/proxy_test.go +++ b/internal/x402/buyer/proxy_test.go @@ -1043,3 +1043,69 @@ func writeFile(t *testing.T, path, content string) error { t.Helper() return os.WriteFile(path, []byte(content), 0o644) } + +func TestProxy_AdminReload(t *testing.T) { + cfg := &Config{Upstreams: map[string]UpstreamConfig{}} + auths := AuthsFile{} + + proxy, err := NewProxy(cfg, auths, nil) + if err != nil { + t.Fatalf("NewProxy: %v", err) + } + + rec := httptest.NewRecorder() + proxy.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/admin/reload", nil)) + if rec.Code != http.StatusOK { + t.Errorf("admin/reload: got %d, want 200", rec.Code) + } + + body := rec.Body.String() + if !strings.Contains(body, "reload triggered") { + t.Errorf("body = %q, want 'reload triggered'", body) + } + + // Channel should have a signal. + select { + case <-proxy.ReloadCh(): + // expected + default: + t.Error("expected reload signal on channel") + } +} + +func TestProxy_AdminReloadIdempotent(t *testing.T) { + cfg := &Config{Upstreams: map[string]UpstreamConfig{}} + auths := AuthsFile{} + + proxy, err := NewProxy(cfg, auths, nil) + if err != nil { + t.Fatalf("NewProxy: %v", err) + } + + // First request: should get "reload triggered". + rec1 := httptest.NewRecorder() + proxy.ServeHTTP(rec1, httptest.NewRequest(http.MethodPost, "/admin/reload", nil)) + if rec1.Code != http.StatusOK { + t.Errorf("first admin/reload: got %d, want 200", rec1.Code) + } + + // Second request without draining the channel: "already pending". + rec2 := httptest.NewRecorder() + proxy.ServeHTTP(rec2, httptest.NewRequest(http.MethodPost, "/admin/reload", nil)) + if rec2.Code != http.StatusOK { + t.Errorf("second admin/reload: got %d, want 200", rec2.Code) + } + if !strings.Contains(rec2.Body.String(), "already pending") { + t.Errorf("body = %q, want 'already pending'", rec2.Body.String()) + } + + // Drain the channel. + <-proxy.ReloadCh() + + // Third request: "reload triggered" again. + rec3 := httptest.NewRecorder() + proxy.ServeHTTP(rec3, httptest.NewRequest(http.MethodPost, "/admin/reload", nil)) + if !strings.Contains(rec3.Body.String(), "reload triggered") { + t.Errorf("body = %q, want 'reload triggered'", rec3.Body.String()) + } +} diff --git a/internal/x402/config.go b/internal/x402/config.go index 9df62628..17cac708 100644 --- a/internal/x402/config.go +++ b/internal/x402/config.go @@ -91,7 +91,7 @@ func LoadConfig(path string) (*PricingConfig, error) { // Apply defaults. if cfg.FacilitatorURL == "" { - cfg.FacilitatorURL = "https://facilitator.x402.rs" + cfg.FacilitatorURL = DefaultFacilitatorURL } if cfg.Chain == "" { diff --git a/internal/x402/config_test.go b/internal/x402/config_test.go index 8775f544..e351be89 100644 --- a/internal/x402/config_test.go +++ b/internal/x402/config_test.go @@ -89,8 +89,8 @@ routes: t.Errorf("default chain = %q, want base-sepolia", cfg.Chain) } - if cfg.FacilitatorURL != "https://facilitator.x402.rs" { - t.Errorf("default facilitatorURL = %q, want https://facilitator.x402.rs", cfg.FacilitatorURL) + if cfg.FacilitatorURL != "https://x402.gcp.obol.tech" { + t.Errorf("default facilitatorURL = %q, want https://x402.gcp.obol.tech", cfg.FacilitatorURL) } } @@ -194,7 +194,7 @@ func TestValidateFacilitatorURL(t *testing.T) { wantErr bool }{ // HTTPS always allowed. - {"https standard", "https://facilitator.x402.rs", false}, + {"https standard", "https://x402.gcp.obol.tech", false}, {"https custom", "https://my-facilitator.example.com:8443/verify", false}, // Loopback/internal addresses allowed over HTTP. diff --git a/internal/x402/setup.go b/internal/x402/setup.go index 3f377b0d..7c869da4 100644 --- a/internal/x402/setup.go +++ b/internal/x402/setup.go @@ -16,6 +16,10 @@ const ( x402Namespace = "x402" pricingConfigMap = "x402-pricing" x402SecretName = "x402-secrets" + + // DefaultFacilitatorURL is the Obol-operated x402 facilitator for payment + // verification and settlement. Supports Base Mainnet and Base Sepolia. + DefaultFacilitatorURL = "https://x402.gcp.obol.tech" ) var x402Manifest = mustReadX402Manifest() @@ -42,7 +46,7 @@ func EnsureVerifier(cfg *config.Config) error { // Setup configures x402 pricing in the cluster by patching the ConfigMap // and Secret. Stakater Reloader auto-restarts the verifier pod. -// If facilitatorURL is empty, the default (https://facilitator.x402.rs) is used. +// If facilitatorURL is empty, the Obol-operated facilitator is used. func Setup(cfg *config.Config, wallet, chain, facilitatorURL string) error { if err := ValidateWallet(wallet); err != nil { return err @@ -52,6 +56,10 @@ func Setup(cfg *config.Config, wallet, chain, facilitatorURL string) error { } bin, kc := kubectl.Paths(cfg) + // Populate the CA certificates bundle from the host so the distroless + // verifier image can TLS-verify the facilitator. + populateCABundle(bin, kc) + // 1. Patch the Secret with the wallet address. fmt.Printf("Configuring x402: setting wallet address...\n") secretPatch := map[string]any{"stringData": map[string]string{"WALLET_ADDRESS": wallet}} @@ -69,7 +77,7 @@ func Setup(cfg *config.Config, wallet, chain, facilitatorURL string) error { // static/manual routes. fmt.Printf("Updating x402 pricing config...\n") if facilitatorURL == "" { - facilitatorURL = "https://facilitator.x402.rs" + facilitatorURL = DefaultFacilitatorURL } existingCfg, _ := GetPricingConfig(cfg) var existingRoutes []RouteRule @@ -131,6 +139,50 @@ func GetPricingConfig(cfg *config.Config) (*PricingConfig, error) { return pcfg, nil } +// populateCABundle reads the host's CA certificate bundle and replaces +// the ca-certificates ConfigMap in the x402 namespace. The x402-verifier +// image is distroless and ships without a CA store, so TLS verification +// of external facilitators fails without this. +// +// Uses "kubectl create --dry-run | kubectl replace" instead of "kubectl +// apply" because the macOS CA bundle (~290KB) exceeds the 262KB +// annotation limit that kubectl apply requires. +func populateCABundle(bin, kc string) { + // Common CA bundle paths across Linux distros and macOS. + candidates := []string{ + "/etc/ssl/certs/ca-certificates.crt", // Debian/Ubuntu + "/etc/pki/tls/certs/ca-bundle.crt", // RHEL/Fedora + "/etc/ssl/cert.pem", // macOS / Alpine + } + var caPath string + for _, path := range candidates { + if info, err := os.Stat(path); err == nil && info.Size() > 0 { + caPath = path + break + } + } + if caPath == "" { + return // no CA bundle found — skip silently + } + + // Pipe through kubectl create --dry-run to generate the ConfigMap YAML, + // then kubectl replace to apply it without the annotation size limit. + if err := kubectl.PipeCommands(bin, kc, + []string{"create", "configmap", "ca-certificates", "-n", x402Namespace, + "--from-file=ca-certificates.crt=" + caPath, + "--dry-run=client", "-o", "yaml"}, + []string{"replace", "-f", "-"}); err != nil { + return + } + + // Restart the verifier so it picks up the newly populated CA bundle. + // The ConfigMap is mounted as a volume; Kubernetes may take 60-120s to + // propagate changes, and we need TLS to work immediately for the + // facilitator connection. + _ = kubectl.RunSilent(bin, kc, + "rollout", "restart", "deployment/x402-verifier", "-n", x402Namespace) +} + func patchPricingConfig(bin, kc string, pcfg *PricingConfig) error { pricingBytes, err := yaml.Marshal(pcfg) if err != nil { diff --git a/internal/x402/setup_test.go b/internal/x402/setup_test.go index 1e7722d8..ff8b7652 100644 --- a/internal/x402/setup_test.go +++ b/internal/x402/setup_test.go @@ -103,7 +103,7 @@ func TestPricingConfig_YAMLRoundTrip(t *testing.T) { original := PricingConfig{ Wallet: "0xGLOBALGLOBALGLOBALGLOBALGLOBALGLOBALGL", Chain: "base-sepolia", - FacilitatorURL: "https://facilitator.x402.rs", + FacilitatorURL: "https://x402.gcp.obol.tech", VerifyOnly: true, Routes: []RouteRule{ { @@ -188,7 +188,7 @@ func TestPricingConfig_YAMLWithPerRouteOverrides(t *testing.T) { pcfg := PricingConfig{ Wallet: "0xGLOBALGLOBALGLOBALGLOBALGLOBALGLOBALGL", Chain: "base-sepolia", - FacilitatorURL: "https://facilitator.x402.rs", + FacilitatorURL: "https://x402.gcp.obol.tech", Routes: []RouteRule{ { Pattern: "/inference-llama/v1/*", diff --git a/internal/x402/verifier.go b/internal/x402/verifier.go index ed4a3f57..fcfb020d 100644 --- a/internal/x402/verifier.go +++ b/internal/x402/verifier.go @@ -119,6 +119,7 @@ func (v *Verifier) HandleVerify(w http.ResponseWriter, r *http.Request) { Chain: chain, Amount: rule.Price, RecipientAddress: wallet, + Description: fmt.Sprintf("Payment required for %s", rule.Pattern), }) if err != nil { log.Printf("x402-verifier: failed to create payment requirement: %v", err) diff --git a/internal/x402/watcher_test.go b/internal/x402/watcher_test.go index 3e966bd9..e0de14e1 100644 --- a/internal/x402/watcher_test.go +++ b/internal/x402/watcher_test.go @@ -10,7 +10,7 @@ import ( const validWatcherYAML = `wallet: "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" chain: "base-sepolia" -facilitatorURL: "https://facilitator.x402.rs" +facilitatorURL: "https://x402.gcp.obol.tech" routes: - pattern: "/rpc/*" price: "0.0001" @@ -32,7 +32,7 @@ func TestWatchConfig_DetectsChange(t *testing.T) { v, err := NewVerifier(&PricingConfig{ Wallet: "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef", Chain: "base-sepolia", - FacilitatorURL: "https://facilitator.x402.rs", + FacilitatorURL: "https://x402.gcp.obol.tech", Routes: []RouteRule{{Pattern: "/rpc/*", Price: "0.0001"}}, }) if err != nil { @@ -49,7 +49,7 @@ func TestWatchConfig_DetectsChange(t *testing.T) { // Write updated config with a new route. updatedYAML := `wallet: "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" chain: "base-sepolia" -facilitatorURL: "https://facilitator.x402.rs" +facilitatorURL: "https://x402.gcp.obol.tech" routes: - pattern: "/rpc/*" price: "0.0001" @@ -79,7 +79,7 @@ func TestWatchConfig_IgnoresUnchanged(t *testing.T) { v, err := NewVerifier(&PricingConfig{ Wallet: "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef", Chain: "base-sepolia", - FacilitatorURL: "https://facilitator.x402.rs", + FacilitatorURL: "https://x402.gcp.obol.tech", Routes: []RouteRule{{Pattern: "/rpc/*", Price: "0.0001"}}, }) if err != nil { @@ -112,7 +112,7 @@ func TestWatchConfig_InvalidConfig(t *testing.T) { v, err := NewVerifier(&PricingConfig{ Wallet: "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef", Chain: "base-sepolia", - FacilitatorURL: "https://facilitator.x402.rs", + FacilitatorURL: "https://x402.gcp.obol.tech", Routes: []RouteRule{{Pattern: "/rpc/*", Price: "0.0001"}}, }) if err != nil { @@ -149,7 +149,7 @@ func TestWatchConfig_CancelContext(t *testing.T) { v, err := NewVerifier(&PricingConfig{ Wallet: "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef", Chain: "base-sepolia", - FacilitatorURL: "https://facilitator.x402.rs", + FacilitatorURL: "https://x402.gcp.obol.tech", Routes: []RouteRule{{Pattern: "/rpc/*", Price: "0.0001"}}, }) if err != nil { @@ -181,7 +181,7 @@ func TestWatchConfig_MissingFile(t *testing.T) { v, err := NewVerifier(&PricingConfig{ Wallet: "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef", Chain: "base-sepolia", - FacilitatorURL: "https://facilitator.x402.rs", + FacilitatorURL: "https://x402.gcp.obol.tech", Routes: []RouteRule{{Pattern: "/rpc/*", Price: "0.0001"}}, }) if err != nil { diff --git a/obolup.sh b/obolup.sh index 07f938fd..2fe1b29b 100755 --- a/obolup.sh +++ b/obolup.sh @@ -110,7 +110,9 @@ command_exists() { check_prerequisites() { local missing=() - # Node.js 22+ / npm — required for openclaw CLI (unless already installed) + # Node.js 22+ / npm — preferred for openclaw CLI install. + # If missing, install_openclaw() will fall back to Docker image extraction. + # Only block here if neither npm NOR docker is available. local need_npm=true if command_exists openclaw; then local oc_version @@ -122,12 +124,17 @@ check_prerequisites() { if [[ "$need_npm" == "true" ]]; then if ! command_exists npm; then - missing+=("Node.js 22+ (npm) — required to install openclaw CLI") + if ! command_exists docker; then + missing+=("Node.js 22+ (npm) or Docker — required to install openclaw CLI") + fi + # npm missing but docker available — install_openclaw() will use Docker fallback else local node_major node_major=$(node --version 2>/dev/null | sed 's/v//' | cut -d. -f1) if [[ -z "$node_major" ]] || [[ "$node_major" -lt 22 ]]; then - missing+=("Node.js 22+ (found: v${node_major:-none}) — required for openclaw CLI") + if ! command_exists docker; then + missing+=("Node.js 22+ (found: v${node_major:-none}) or Docker — required for openclaw CLI") + fi fi fi fi @@ -1126,23 +1133,48 @@ install_openclaw() { return 0 fi - # Require Node.js 22+ and npm + # Prefer npm install; fall back to extracting the binary from Docker image. + local use_npm=true if ! command_exists npm; then - log_warn "npm not found — cannot install openclaw CLI" - echo "" - echo " Install Node.js 22+ first, then re-run obolup.sh" - echo " Or install manually: npm install -g openclaw@$target_version" - echo "" - return 1 + use_npm=false + else + local node_major + node_major=$(node --version 2>/dev/null | sed 's/v//' | cut -d. -f1) + if [[ -z "$node_major" ]] || [[ "$node_major" -lt 22 ]]; then + use_npm=false + fi fi - local node_major - node_major=$(node --version 2>/dev/null | sed 's/v//' | cut -d. -f1) - if [[ -z "$node_major" ]] || [[ "$node_major" -lt 22 ]]; then - log_warn "Node.js 22+ required for openclaw (found: v${node_major:-none})" + if [[ "$use_npm" == "false" ]]; then + # Docker fallback: extract openclaw binary from the published image. + if command_exists docker; then + log_info "npm/Node.js not available — extracting openclaw from Docker image..." + local image="ghcr.io/obolnetwork/openclaw:$target_version" + if docker pull "$image" 2>&1 | tail -1; then + local cid + cid=$(docker create "$image" 2>/dev/null) + if [[ -n "$cid" ]]; then + docker cp "$cid:/usr/local/bin/openclaw" "$OBOL_BIN_DIR/openclaw" 2>/dev/null \ + || docker cp "$cid:/app/openclaw" "$OBOL_BIN_DIR/openclaw" 2>/dev/null \ + || docker cp "$cid:/openclaw" "$OBOL_BIN_DIR/openclaw" 2>/dev/null + docker rm "$cid" >/dev/null 2>&1 + if [[ -f "$OBOL_BIN_DIR/openclaw" ]]; then + chmod +x "$OBOL_BIN_DIR/openclaw" + log_success "openclaw v$target_version installed (from Docker image)" + return 0 + fi + fi + fi + log_warn "Docker extraction failed" + echo " Pull the Docker image: docker pull $image" + echo "" + return 1 + fi + + log_warn "npm and Docker both unavailable — cannot install openclaw CLI" echo "" - echo " Upgrade Node.js, then re-run obolup.sh" - echo " Or install manually: npm install -g openclaw@$target_version" + echo " Install Node.js 22+ first, then re-run obolup.sh" + echo " Or pull the Docker image: docker pull ghcr.io/obolnetwork/openclaw:$target_version" echo "" return 1 fi