Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/api-gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"@repo/logger": "workspace:*",
"@repo/prisma": "workspace:*",
"@repo/proto-defs": "workspace:*",
"@repo/utils": "workspace:*",
"dotenv": "^17.2.2",
"envalid": "^8.1.0",
"express": "^5.1.0",
Expand Down
6 changes: 3 additions & 3 deletions apps/api-gateway/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import orderRouter from "@/routers/order.route";
const app: Express = express();
app.use(express.json());

app.use("/api/v1/trade", tradeRouter);
app.use("/api/v1/user", userRouter);
app.use("/api/v1/order", orderRouter);
app.use("/api/v1/trades", tradeRouter);
app.use("/api/v1/users", userRouter);
app.use("/api/v1/orders", orderRouter);

app.get("/", (_, res) => {
res.json({ message: "Hello World from Nerve trade platform's backend" });
Expand Down
27 changes: 27 additions & 0 deletions apps/candle-service/.air.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
root = "."
tmp_dir = "tmp"

[build]
cmd = "go build -o ./tmp/candle-service ./cmd/candle-service"
bin = "tmp/candle-service"

include_ext = ["go"]

include_dir = [
".",
"../../packages/proto-defs/go/generated"
]

exclude_dir = [
"tmp",
"vendor",
"node_modules"
]

delay = 200

[run]
cmd = "./tmp/candle-service"

[log]
time = true
87 changes: 87 additions & 0 deletions apps/candle-service/cmd/candle-service/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"context"
"log/slog"
"net"
"os"
"os/signal"
"syscall"

"github.com/sameerkrdev/nerve/apps/candle-service/internal"
"github.com/sameerkrdev/nerve/apps/candle-service/internal/engine"
"github.com/sameerkrdev/nerve/apps/candle-service/internal/kafka"
memorystore "github.com/sameerkrdev/nerve/apps/candle-service/internal/memoryStore"
)

//* func: define grpc server and start consumer and workers
//* func: start the kafka consumer
//* func: start the workers
//* - each worker recieve gets single symbol trade data via channel
//* - calculate the candlestick data for multiple timeframe
//* - L1: In-memory (last 1000 candles)
//* - L2: Redis Memory (last 5000 candles)
// - L3: store the trades into clickhouse which will eventually generate the candles data
//* - Fanout:
//* - publish to kafka for other services
//* - redis pub/sub for websockets servers
// func: to get the historical data of candles
// func: graceful shutdown

// in main or in server, initialize router workers, then initialize kafka consumer handler then initialize kafka client with consume func call

func main() {
if err := memorystore.InitRedis(); err != nil {
slog.Error("redis init failed", "error", err)
os.Exit(1)
}

brokerAddresses := []string{"localhost:19092", "localhost:19093", "localhost:19094"}

if err := kafka.InitKafkaProducer(brokerAddresses); err != nil {
slog.Error("kafka producer init failed", "error", err)
os.Exit(1)
}

workerRouter := engine.NewWorkerRouter(10, kafka.PublishCandleEventToKafka)
// mux := internal.NewServer(workerRouter)

kafkaConsumerClient, err := kafka.NewKafkaConsumerClient(brokerAddresses)
if err != nil {
slog.Error("kafka consumer connection failed", "error", err)
os.Exit(1)
}

kafkaConsumerHandler := kafka.NewConsumerHandler(workerRouter)

topics := []string{
"trades",
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go kafkaConsumerClient.Consume(ctx, topics, kafkaConsumerHandler)

PORT := "50054"

listener, err := net.Listen("tcp", ":"+PORT)
if err != nil {
slog.Error("net server failed", "error", err)
os.Exit(1)
}

slog.Info("Net server listening", "port", PORT)

grpcServer := internal.NewGrpcServer(workerRouter, listener)

Comment on lines +75 to +76
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

gRPC serve errors are not surfaced to main for controlled handling.

At Line 75, server start is delegated to internal.NewGrpcServer(...), but that helper currently hard-exits via log.Fatalf on serve failure (apps/candle-service/internal/server.go:31). This bypasses coordinated shutdown logic in main.

Prefer returning serve errors (e.g., via channel or explicit Serve call in main) and handling termination policy centrally.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/candle-service/cmd/candle-service/main.go` around lines 75 - 76, The
gRPC helper NewGrpcServer currently hard-exits on Serve failure; change it to
return a server and a serve-starting function/error channel (e.g., return
*grpc.Server and net.Listener or a Start/Serve method that returns error)
instead of calling log.Fatalf inside internal.NewGrpcServer (or its Serve call),
and remove any direct os.Exit/log.Fatalf; then update main to call the returned
Serve (or listen on the error channel) and handle errors through main's
coordinated shutdown logic (invoke graceful stop on the returned *grpc.Server
and propagate/log the error centrally).

// graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

<-quit

slog.Info("shutting down...")

cancel()
grpcServer.GracefulStop()
}
Comment on lines +61 to +87
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Graceful shutdown is incomplete for async Kafka flow.

Line 63 starts consumption in a goroutine, but Lines 85-87 exit without waiting for that goroutine and without an explicit producer shutdown/flush path. This can lose in-flight events during SIGTERM.

Suggested direction
+import "sync"
...
-ctx, cancel := context.WithCancel(context.Background())
+ctx, cancel := context.WithCancel(context.Background())
 defer cancel()
-go kafkaConsumerClient.Consume(ctx, topics, kafkaConsumerHandler)
+var consumerWG sync.WaitGroup
+consumerWG.Add(1)
+go func() {
+  defer consumerWG.Done()
+  kafkaConsumerClient.Consume(ctx, topics, kafkaConsumerHandler)
+}()
...
 cancel()
+consumerWG.Wait()
+// call kafka producer shutdown helper here (flush/close) before process exits
 grpcServer.GracefulStop()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go kafkaConsumerClient.Consume(ctx, topics, kafkaConsumerHandler)
PORT := "50054"
listener, err := net.Listen("tcp", ":"+PORT)
if err != nil {
slog.Error("net server failed", "error", err)
os.Exit(1)
}
slog.Info("Net server listening", "port", PORT)
grpcServer := internal.NewGrpcServer(workerRouter, listener)
// graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
slog.Info("shutting down...")
cancel()
grpcServer.GracefulStop()
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var consumerWG sync.WaitGroup
consumerWG.Add(1)
go func() {
defer consumerWG.Done()
kafkaConsumerClient.Consume(ctx, topics, kafkaConsumerHandler)
}()
PORT := "50054"
listener, err := net.Listen("tcp", ":"+PORT)
if err != nil {
slog.Error("net server failed", "error", err)
os.Exit(1)
}
slog.Info("Net server listening", "port", PORT)
grpcServer := internal.NewGrpcServer(workerRouter, listener)
// graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
slog.Info("shutting down...")
cancel()
consumerWG.Wait()
// call kafka producer shutdown helper here (flush/close) before process exits
grpcServer.GracefulStop()
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/candle-service/cmd/candle-service/main.go` around lines 61 - 87, The
graceful shutdown currently cancels the context but does not wait for the Kafka
consumer goroutine or explicitly shut down any Kafka producer/flush path, so
in-flight messages can be lost; change the shutdown to create and track the
consumer goroutine with a sync.WaitGroup (or a returned done channel) so the
main goroutine waits for kafkaConsumerClient.Consume(ctx, topics,
kafkaConsumerHandler) to return before calling grpcServer.GracefulStop(), and
add explicit shutdown/flush calls for your Kafka client (e.g.,
kafkaConsumerClient.Close() or producer.Flush/Close) after cancel() and before
exiting; ensure kafkaConsumerClient.Consume, kafkaConsumerClient.Close (or the
producer Flush/Close) are invoked in that order and the WaitGroup/done is
awaited.

49 changes: 49 additions & 0 deletions apps/candle-service/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
module github.com/sameerkrdev/nerve/apps/candle-service

go 1.25.4

require (
github.com/IBM/sarama v1.46.3
github.com/redis/go-redis/v9 v9.18.0
google.golang.org/protobuf v1.36.11
)

require (
github.com/ClickHouse/ch-go v0.71.0 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.45.0 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.18.3 // indirect
github.com/paulmach/orb v0.12.0 // indirect
github.com/pierrec/lz4/v4 v4.1.25 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
github.com/segmentio/asm v1.2.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
go.opentelemetry.io/otel v1.41.0 // indirect
go.opentelemetry.io/otel/trace v1.41.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.50.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.43.0 // indirect
golang.org/x/text v0.36.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260427160629-7cedc36a6bc4 // indirect
google.golang.org/grpc v1.80.0 // indirect
)
Loading