forked from Protocol-Lattice/GoEventBus
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtransaction_test.go
More file actions
128 lines (115 loc) · 3.44 KB
/
transaction_test.go
File metadata and controls
128 lines (115 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package GoEventBus
import (
"context"
"errors"
"sync/atomic"
"testing"
)
// dummy handler just increments a counter
func makeCounterHandler(counter *uint64) HandlerFunc {
return func(ctx context.Context, args map[string]any) (Result, error) {
atomic.AddUint64(counter, 1)
return Result{Message: "ok"}, nil
}
}
func TestTransaction_CommitAndRollback(t *testing.T) {
// set up dispatcher with a no-op projection
var processed uint64
dispatcher := Dispatcher{
"p": makeCounterHandler(&processed),
}
es := NewEventStore(&dispatcher, 8, DropOldest)
ctx := context.Background()
// Test Commit
tx := es.BeginTransaction()
tx.Publish(Event{ID: "1", Projection: "p", Args: map[string]any{}})
tx.Publish(Event{ID: "2", Projection: "p", Args: map[string]any{}})
if err := tx.Commit(ctx); err != nil {
t.Fatalf("Commit failed: %v", err)
}
// actually publish into store
es.Publish()
if got := atomic.LoadUint64(&processed); got != 2 {
t.Errorf("expected 2 events processed, got %d", got)
}
// Test Rollback
processed = 0
tx2 := es.BeginTransaction()
tx2.Publish(Event{ID: "3", Projection: "p", Args: map[string]any{}})
tx2.Rollback()
if err := tx2.Commit(ctx); err != nil {
t.Fatalf("Commit after rollback should not error, got %v", err)
}
es.Publish()
if got := atomic.LoadUint64(&processed); got != 0 {
t.Errorf("expected 0 events after rollback, got %d", got)
}
}
func TestTransaction_PartialFailure(t *testing.T) {
// handler that errors on the second event
cnt := uint64(0)
dispatcher := Dispatcher{
"x": func(ctx context.Context, args map[string]any) (Result, error) {
i := atomic.AddUint64(&cnt, 1)
if i == 2 {
return Result{}, errors.New("boom")
}
return Result{Message: "ok"}, nil
},
}
es := NewEventStore(&dispatcher, 4, ReturnError)
ctx := context.Background()
tx := es.BeginTransaction()
tx.Publish(Event{ID: "a", Projection: "x", Args: map[string]any{}})
tx.Publish(Event{ID: "b", Projection: "x", Args: map[string]any{}})
err := tx.Commit(ctx)
if err == nil {
t.Fatal("expected Commit to return error on second event, got nil")
}
if err.Error() != "goeventbus: buffer is full" && err.Error() != "boom" {
// Depending on overrun policy, could bubble Subscribe error or handler error
t.Fatalf("unexpected error: %v", err)
}
}
func BenchmarkTransaction_SyncCommit(b *testing.B) {
dispatcher := Dispatcher{
"p": func(ctx context.Context, args map[string]any) (Result, error) {
return Result{Message: "ok"}, nil
},
}
es := NewEventStore(&dispatcher, 256, DropOldest)
es.Async = false
b.ResetTimer()
for i := 0; i < b.N; i++ {
tx := es.BeginTransaction()
// buffer N events per transaction
for j := 0; j < 16; j++ {
tx.Publish(Event{ID: "t", Projection: "p", Args: map[string]any{}})
}
if err := tx.Commit(context.Background()); err != nil {
b.Fatalf("Commit error: %v", err)
}
es.Publish()
}
}
func BenchmarkTransaction_AsyncCommit(b *testing.B) {
dispatcher := Dispatcher{
"p": func(ctx context.Context, args map[string]any) (Result, error) {
return Result{Message: "ok"}, nil
},
}
es := NewEventStore(&dispatcher, 256, DropOldest)
es.Async = true
b.ResetTimer()
for i := 0; i < b.N; i++ {
tx := es.BeginTransaction()
for j := 0; j < 16; j++ {
tx.Publish(Event{ID: "t", Projection: "p", Args: map[string]any{}})
}
if err := tx.Commit(context.Background()); err != nil {
b.Fatalf("Commit error: %v", err)
}
es.Publish()
es.Drain(context.Background())
}
}