Skip to content

Commit a788ced

Browse files
authored
add tx submission support (#45)
* add tx submission support * add e2e test case * Add e2e submission CI job * rewrite tests and avoid passing private key directly to service * remove redundant tests * comments * use go square * Resolve outstanding PR comments
1 parent 925677d commit a788ced

62 files changed

Lines changed: 8189 additions & 94 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,18 @@ jobs:
4141
with:
4242
go-version-file: go.mod
4343
- run: go build -o bin/apex ./cmd/apex
44+
45+
e2e-submission:
46+
name: E2E Submission
47+
runs-on: ubuntu-latest
48+
timeout-minutes: 30
49+
steps:
50+
- uses: actions/checkout@v4
51+
- uses: actions/setup-go@v5
52+
with:
53+
go-version-file: e2e/go.mod
54+
cache-dependency-path: |
55+
go.sum
56+
e2e/go.sum
57+
- working-directory: e2e
58+
run: go test -race -count=1 -timeout 20m -run TestSubmissionViaJSONRPC ./...

AGENTS.md

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# Apex — Celestia Namespace Indexer
2+
3+
Lightweight indexer that watches Celestia namespaces, stores blobs/headers in SQLite, and exposes them via JSON-RPC, gRPC, and REST health endpoints. Includes Prometheus observability, a CLI client, and multi-stage Docker build.
4+
5+
## Build Commands
6+
7+
```bash
8+
just build # compile to bin/apex
9+
just test # go test -race ./...
10+
just lint # golangci-lint run
11+
just fmt # gofumpt -w .
12+
just check # tidy + lint + test + build (CI equivalent)
13+
just run # build and run
14+
just clean # remove bin/
15+
just tidy # go mod tidy
16+
```
17+
18+
## Architecture
19+
20+
### Data Flow
21+
22+
```text
23+
Celestia Node → Fetcher → Sync Coordinator → Store (SQLite)
24+
→ Notifier → Subscribers
25+
26+
API (JSON-RPC + gRPC + Health)
27+
```
28+
29+
The sync coordinator runs in two phases: **backfill** (historical blocks in batches) then **streaming** (live via header subscription). Height observers publish events to the notifier which fans out to API subscribers.
30+
31+
### File Structure
32+
33+
```text
34+
cmd/apex/
35+
main.go CLI entrypoint, server wiring, graceful shutdown
36+
client.go Thin HTTP JSON-RPC client for CLI commands
37+
status.go `apex status` command (health endpoint)
38+
blob_cmd.go `apex blob get|list` commands
39+
config_cmd.go `apex config validate|show` commands
40+
41+
config/
42+
config.go Config structs (DataSource, Storage, RPC, Sync, Metrics, Log)
43+
load.go YAML loading, validation, env var override, template generation
44+
45+
pkg/types/
46+
types.go Domain types: Namespace, Blob, Header, SyncState, SyncStatus
47+
48+
pkg/store/
49+
store.go Store interface (PutBlobs, GetBlobs, PutHeader, GetHeader, sync state)
50+
sqlite.go SQLite implementation with metrics instrumentation
51+
migrations/ SQL migration files
52+
53+
pkg/fetch/
54+
fetcher.go DataFetcher + ProofForwarder interfaces
55+
celestia_node.go Celestia node-api client (headers, blobs, subscriptions, proofs)
56+
celestia_app.go Celestia-app gRPC client (headers, blobs, polling subscription)
57+
58+
pkg/sync/
59+
coordinator.go Sync lifecycle: initialize → backfill → stream, tracks heights
60+
backfill.go Concurrent batch backfill with configurable batch size/concurrency
61+
subscription.go Header subscription manager for live streaming
62+
63+
pkg/api/
64+
service.go API service layer (blob/header queries, proof forwarding, subscriptions)
65+
notifier.go Event fan-out to subscribers with bounded buffers
66+
health.go /health and /health/ready HTTP endpoints, HealthStatus JSON
67+
jsonrpc/ JSON-RPC server (go-jsonrpc), blob/header/subscription handlers
68+
grpc/ gRPC server, protobuf service implementations
69+
gen/apex/v1/ Generated protobuf Go code
70+
gen/cosmos/base/tendermint/v1beta1/ Generated Cosmos CometBFT service client
71+
72+
pkg/metrics/
73+
metrics.go Recorder interface (nil-safe), nopRecorder, PromRecorder (Prometheus)
74+
server.go HTTP server for /metrics endpoint
75+
76+
proto/apex/v1/ Protobuf definitions (blob, header, types)
77+
proto/cosmos/base/tendermint/v1beta1/ Minimal Cosmos SDK CometBFT service proto
78+
79+
Dockerfile Multi-stage build (golang builder + distroless runtime)
80+
```
81+
82+
### Key Interfaces
83+
84+
- **`store.Store`** — persistence (SQLite impl, instrumented with metrics)
85+
- **`fetch.DataFetcher`** — block data retrieval (Celestia node JSON-RPC or celestia-app gRPC)
86+
- **`fetch.ProofForwarder`** — proof/inclusion forwarding to upstream node
87+
- **`metrics.Recorder`** — nil-safe metrics abstraction (Prometheus or no-op)
88+
- **`api.StatusProvider`** — sync status for health endpoints (implemented by coordinator)
89+
90+
### Ports (defaults)
91+
92+
| Port | Protocol | Purpose |
93+
|-------|----------|------------------|
94+
| :8080 | HTTP | JSON-RPC + health|
95+
| :9090 | TCP | gRPC |
96+
| :9091 | HTTP | Prometheus /metrics |
97+
98+
### Config
99+
100+
YAML with strict unknown-field rejection. Auth token via `APEX_AUTH_TOKEN` env var only (not in config file). See `config/config.go` for all fields and `DefaultConfig()` for defaults.
101+
102+
## Conventions
103+
104+
- Go 1.25+ (`go.mod` specifies 1.25.0)
105+
- SQLite via `modernc.org/sqlite` (CGo-free)
106+
- Config: YAML (`gopkg.in/yaml.v3`), strict unknown-field rejection
107+
- Logging: `rs/zerolog`
108+
- CLI: `spf13/cobra`
109+
- Metrics: `prometheus/client_golang` behind nil-safe `Recorder` interface
110+
- JSON-RPC: `filecoin-project/go-jsonrpc`
111+
- gRPC: `google.golang.org/grpc` + `google.golang.org/protobuf`
112+
- Protobuf codegen: `buf` (`buf.yaml` + `buf.gen.yaml`)
113+
- Linter: golangci-lint v2 (.golangci.yml v2 format), gocyclo max 15
114+
- Formatter: gofumpt
115+
- Build runner: just (justfile)
116+
117+
## Dependencies
118+
119+
- Only add deps that are strictly necessary
120+
- Prefer stdlib where reasonable
121+
- No CGo dependencies (cross-compilation constraint)
122+
123+
## Testing
124+
125+
- All tests use `-race`
126+
- Table-driven tests preferred
127+
- Test files alongside source (`_test.go`)
128+
- No test frameworks beyond stdlib `testing`

buf.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ modules:
44
lint:
55
use:
66
- DEFAULT
7+
ignore_only:
8+
PACKAGE_VERSION_SUFFIX:
9+
- proto/cosmos/crypto/secp256k1/keys.proto
710
breaking:
811
use:
912
- FILE

cmd/apex/main.go

Lines changed: 99 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/evstack/apex/pkg/metrics"
2626
"github.com/evstack/apex/pkg/profile"
2727
"github.com/evstack/apex/pkg/store"
28+
"github.com/evstack/apex/pkg/submit"
2829
syncer "github.com/evstack/apex/pkg/sync"
2930
"github.com/evstack/apex/pkg/types"
3031
)
@@ -113,6 +114,7 @@ func startCmd() *cobra.Command {
113114
startLog := log.Info().
114115
Str("version", version).
115116
Str("datasource_type", cfg.DataSource.Type).
117+
Bool("submission_enabled", cfg.Submission.Enabled).
116118
Int("namespaces", len(cfg.DataSource.Namespaces))
117119
if cfg.DataSource.Type == dataSourceTypeApp {
118120
startLog = startLog.Str("app_grpc_addr", cfg.DataSource.CelestiaAppGRPCAddr)
@@ -171,7 +173,12 @@ func setupProfiling(cfg *config.Config) *profile.Server {
171173
func openDataSource(ctx context.Context, cfg *config.Config) (fetch.DataFetcher, fetch.ProofForwarder, error) {
172174
switch cfg.DataSource.Type {
173175
case dataSourceTypeApp:
174-
appFetcher, err := fetch.NewCelestiaAppFetcher(cfg.DataSource.CelestiaAppGRPCAddr, cfg.DataSource.AuthToken, log.Logger)
176+
appFetcher, err := fetch.NewCelestiaAppFetcher(
177+
cfg.DataSource.CelestiaAppGRPCAddr,
178+
cfg.DataSource.AuthToken,
179+
cfg.DataSource.CelestiaAppGRPCInsecure,
180+
log.Logger,
181+
)
175182
if err != nil {
176183
return nil, nil, fmt.Errorf("create celestia-app fetcher: %w", err)
177184
}
@@ -275,33 +282,18 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
275282
}
276283
defer dataFetcher.Close() //nolint:errcheck
277284

278-
// Set up API layer.
279-
notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
280-
notifier.SetMetrics(rec)
281-
svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger)
282-
283-
// Build and run the sync coordinator with observer hook.
284-
coordOpts := []syncer.Option{
285-
syncer.WithStartHeight(cfg.Sync.StartHeight),
286-
syncer.WithBatchSize(cfg.Sync.BatchSize),
287-
syncer.WithConcurrency(cfg.Sync.Concurrency),
288-
syncer.WithLogger(log.Logger),
289-
syncer.WithMetrics(rec),
290-
syncer.WithObserver(func(h uint64, hdr *types.Header, blobs []types.Blob) {
291-
notifier.Publish(api.HeightEvent{Height: h, Header: hdr, Blobs: blobs})
292-
}),
285+
svc, notifier, closeSubmitter, err := setupAPIService(cfg, db, dataFetcher, proofFwd, rec)
286+
if err != nil {
287+
return err
293288
}
289+
defer closeSubmitter()
294290

295-
backfillOpt, closeBackfill, err := maybeBackfillSourceOption(cfg, log.Logger)
291+
// Build and run the sync coordinator with observer hook.
292+
coordOpts, closeBackfill, err := buildCoordinatorOptions(cfg, notifier, rec)
296293
if err != nil {
297294
return err
298295
}
299-
if closeBackfill != nil {
300-
defer closeBackfill()
301-
}
302-
if backfillOpt != nil {
303-
coordOpts = append(coordOpts, backfillOpt)
304-
}
296+
defer closeBackfill()
305297

306298
coord := syncer.New(db, dataFetcher, coordOpts...)
307299

@@ -360,6 +352,90 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
360352
return nil
361353
}
362354

355+
func openBlobSubmitter(cfg *config.Config) (*submit.DirectSubmitter, error) {
356+
if !cfg.Submission.Enabled {
357+
return nil, nil
358+
}
359+
360+
appClient, err := submit.NewGRPCAppClient(
361+
cfg.Submission.CelestiaAppGRPCAddr,
362+
cfg.Submission.CelestiaAppGRPCInsecure,
363+
)
364+
if err != nil {
365+
return nil, fmt.Errorf("create submission app client: %w", err)
366+
}
367+
368+
signer, err := submit.LoadSigner(cfg.Submission.SignerKey)
369+
if err != nil {
370+
_ = appClient.Close()
371+
return nil, fmt.Errorf("load submission signer: %w", err)
372+
}
373+
374+
blobSubmitter, err := submit.NewDirectSubmitter(appClient, signer, submit.DirectConfig{
375+
ChainID: cfg.Submission.ChainID,
376+
GasPrice: cfg.Submission.GasPrice,
377+
MaxGasPrice: cfg.Submission.MaxGasPrice,
378+
ConfirmationTimeout: time.Duration(cfg.Submission.ConfirmationTimeout) * time.Second,
379+
})
380+
if err != nil {
381+
_ = appClient.Close()
382+
return nil, fmt.Errorf("configure submission backend: %w", err)
383+
}
384+
385+
return blobSubmitter, nil
386+
}
387+
388+
func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder) (*api.Service, *api.Notifier, func(), error) {
389+
blobSubmitter, err := openBlobSubmitter(cfg)
390+
if err != nil {
391+
return nil, nil, nil, err
392+
}
393+
394+
closeSubmitter := func() {}
395+
if blobSubmitter != nil {
396+
closeSubmitter = func() {
397+
_ = blobSubmitter.Close()
398+
}
399+
}
400+
401+
notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
402+
notifier.SetMetrics(rec)
403+
404+
svcOpts := make([]api.ServiceOption, 0, 1)
405+
if blobSubmitter != nil {
406+
svcOpts = append(svcOpts, api.WithBlobSubmitter(blobSubmitter))
407+
}
408+
409+
svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger, svcOpts...)
410+
return svc, notifier, closeSubmitter, nil
411+
}
412+
413+
func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec metrics.Recorder) ([]syncer.Option, func(), error) {
414+
coordOpts := []syncer.Option{
415+
syncer.WithStartHeight(cfg.Sync.StartHeight),
416+
syncer.WithBatchSize(cfg.Sync.BatchSize),
417+
syncer.WithConcurrency(cfg.Sync.Concurrency),
418+
syncer.WithLogger(log.Logger),
419+
syncer.WithMetrics(rec),
420+
syncer.WithObserver(func(h uint64, hdr *types.Header, blobs []types.Blob) {
421+
notifier.Publish(api.HeightEvent{Height: h, Header: hdr, Blobs: blobs})
422+
}),
423+
}
424+
425+
backfillOpt, closeBackfill, err := maybeBackfillSourceOption(cfg, log.Logger)
426+
if err != nil {
427+
return nil, nil, err
428+
}
429+
if closeBackfill == nil {
430+
closeBackfill = func() {}
431+
}
432+
if backfillOpt != nil {
433+
coordOpts = append(coordOpts, backfillOpt)
434+
}
435+
436+
return coordOpts, closeBackfill, nil
437+
}
438+
363439
func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) {
364440
stopped := make(chan struct{})
365441
go func() {

config/config.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,23 @@ type Config struct {
1616
Metrics MetricsConfig `yaml:"metrics"`
1717
Profiling ProfilingConfig `yaml:"profiling"`
1818
Log LogConfig `yaml:"log"`
19+
Submission SubmissionConfig `yaml:"submission"`
1920
}
2021

2122
// DataSourceConfig configures the Celestia data source.
2223
// Type selects the backend: "node" (default) uses a Celestia DA node,
2324
// "app" uses a celestia-app consensus node via Cosmos SDK gRPC.
2425
type DataSourceConfig struct {
25-
Type string `yaml:"type"` // "node" (default) or "app"
26-
CelestiaNodeURL string `yaml:"celestia_node_url"`
27-
CelestiaAppGRPCAddr string `yaml:"celestia_app_grpc_addr"`
28-
BackfillSource string `yaml:"backfill_source"` // "rpc" (default) or "db" for app mode
29-
CelestiaAppDBPath string `yaml:"celestia_app_db_path"` // required when backfill_source=db
30-
CelestiaAppDBBackend string `yaml:"celestia_app_db_backend"` // auto|pebble|leveldb
31-
CelestiaAppDBLayout string `yaml:"celestia_app_db_layout"` // auto|v1|v2
32-
AuthToken string `yaml:"-"` //nolint:gosec // populated only via APEX_AUTH_TOKEN env var; not a hardcoded credential
33-
Namespaces []string `yaml:"namespaces"`
26+
Type string `yaml:"type"` // "node" (default) or "app"
27+
CelestiaNodeURL string `yaml:"celestia_node_url"`
28+
CelestiaAppGRPCAddr string `yaml:"celestia_app_grpc_addr"`
29+
CelestiaAppGRPCInsecure bool `yaml:"celestia_app_grpc_insecure"` // allow plaintext gRPC to non-loopback celestia-app endpoints
30+
BackfillSource string `yaml:"backfill_source"` // "rpc" (default) or "db" for app mode
31+
CelestiaAppDBPath string `yaml:"celestia_app_db_path"` // required when backfill_source=db
32+
CelestiaAppDBBackend string `yaml:"celestia_app_db_backend"` // auto|pebble|leveldb
33+
CelestiaAppDBLayout string `yaml:"celestia_app_db_layout"` // auto|v1|v2
34+
AuthToken string `yaml:"-"` //nolint:gosec // populated only via APEX_AUTH_TOKEN env var; not a hardcoded credential
35+
Namespaces []string `yaml:"namespaces"`
3436
}
3537

3638
// StorageConfig configures the persistence backend.
@@ -92,6 +94,18 @@ type LogConfig struct {
9294
Format string `yaml:"format"`
9395
}
9496

97+
// SubmissionConfig contains settings for the future blob submission pipeline.
98+
type SubmissionConfig struct {
99+
Enabled bool `yaml:"enabled"`
100+
CelestiaAppGRPCAddr string `yaml:"app_grpc_addr"`
101+
CelestiaAppGRPCInsecure bool `yaml:"app_grpc_insecure"` // allow plaintext gRPC to non-loopback celestia-app endpoints
102+
ChainID string `yaml:"chain_id"`
103+
SignerKey string `yaml:"signer_key"` // path to a file containing the hex-encoded secp256k1 key
104+
GasPrice float64 `yaml:"gas_price"` // 0 means unset; callers must provide gas_price per request
105+
MaxGasPrice float64 `yaml:"max_gas_price"` // 0 disables the max gas price cap
106+
ConfirmationTimeout int `yaml:"confirmation_timeout"` // seconds
107+
}
108+
95109
// DefaultConfig returns a Config with sensible defaults.
96110
func DefaultConfig() Config {
97111
return Config{
@@ -120,6 +134,15 @@ func DefaultConfig() Config {
120134
BufferSize: 64,
121135
MaxSubscribers: 1024,
122136
},
137+
Submission: SubmissionConfig{
138+
Enabled: false,
139+
CelestiaAppGRPCAddr: "",
140+
ChainID: "",
141+
SignerKey: "",
142+
GasPrice: 0,
143+
MaxGasPrice: 0,
144+
ConfirmationTimeout: 30,
145+
},
123146
Metrics: MetricsConfig{
124147
Enabled: true,
125148
ListenAddr: ":9091",
@@ -143,6 +166,9 @@ func (c *Config) ParsedNamespaces() ([]types.Namespace, error) {
143166
if err != nil {
144167
return nil, fmt.Errorf("invalid namespace %q: %w", hex, err)
145168
}
169+
if err := ns.ValidateForBlob(); err != nil {
170+
return nil, fmt.Errorf("invalid namespace %q: %w", hex, err)
171+
}
146172
namespaces = append(namespaces, ns)
147173
}
148174
return namespaces, nil

0 commit comments

Comments
 (0)