diff --git a/temporal/job/definition.go b/temporal/job/definition.go new file mode 100644 index 0000000..24e6d4c --- /dev/null +++ b/temporal/job/definition.go @@ -0,0 +1,181 @@ +package job + +import ( + "context" + "fmt" + "sync" + + "github.com/google/uuid" + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" +) + +// Definition is a type-focused description of one registered Temporal workflow. +// All per-job operations hang off the type as methods. +type Definition struct { + Name string + TaskQueue string + Description string + Tags []string + Schedule *ScheduleSpec + + // Private wiring set only by New via Option closures. + register func(worker.Worker) + execute func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, input any) (client.WorkflowRun, error) + newInput func() any +} + +// Option configures a Definition during construction. +type Option func(*Definition) + +// WithRegister sets the worker-registration closure. +func WithRegister(fn func(worker.Worker)) Option { + return func(d *Definition) { d.register = fn } +} + +// WithExecute sets the workflow-execution closure. The closure receives a +// pre-built client.StartWorkflowOptions (ID + TaskQueue + caller overrides) +// and the typed input value. +func WithExecute(fn func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, input any) (client.WorkflowRun, error)) Option { + return func(d *Definition) { d.execute = fn } +} + +// WithNewInput sets the factory that returns a typed zero value of the +// workflow input. Callers fill the value before calling Execute. +func WithNewInput(fn func() any) Option { + return func(d *Definition) { d.newInput = fn } +} + +// WithSchedule attaches an optional schedule specification. +func WithSchedule(spec *ScheduleSpec) Option { + return func(d *Definition) { d.Schedule = spec } +} + +// WithDescription attaches a human-readable description. +func WithDescription(desc string) Option { + return func(d *Definition) { d.Description = desc } +} + +// WithTags attaches user-defined tags. +func WithTags(tags ...string) Option { + return func(d *Definition) { d.Tags = tags } +} + +// New constructs a Definition. Validates name, task queue, all closures, and +// the optional schedule. Returns ErrInvalidDefinition if anything is missing +// or inconsistent. +func New(name, taskQueue string, opts ...Option) (*Definition, error) { + if name == "" { + return nil, fmt.Errorf("%w: name required", ErrInvalidDefinition) + } + if taskQueue == "" { + return nil, fmt.Errorf("%w: task queue required", ErrInvalidDefinition) + } + d := &Definition{Name: name, TaskQueue: taskQueue} + for _, opt := range opts { + opt(d) + } + if d.register == nil || d.execute == nil || d.newInput == nil { + return nil, fmt.Errorf("%w: WithRegister, WithExecute, and WithNewInput are all required", ErrInvalidDefinition) + } + if d.Schedule != nil { + if err := d.Schedule.validate(); err != nil { + return nil, fmt.Errorf("%w: %s", ErrInvalidDefinition, err) + } + } + return d, nil +} + +// NewInput returns a fresh typed zero value for this Definition's workflow +// input. Callers fill it before calling Execute (e.g., via json.Unmarshal). +func (d *Definition) NewInput() any { + return d.newInput() +} + +// Register wires the workflow and its activities onto a worker. Safe to call +// concurrently and multiple times — the builder-supplied register closure is +// expected to use RegisterWorkflowOnce / RegisterActivityOnce for idempotency +// when the underlying workflow type may be shared across Definitions. +func (d *Definition) Register(w worker.Worker) { + if d.register == nil { + return + } + d.register(w) +} + +// Execute starts a workflow run. The workflow ID defaults to "-" +// unless overridden via WithWorkflowID(...). +func (d *Definition) Execute(ctx context.Context, c client.Client, input any, opts ...ExecuteOption) (RunHandle, error) { + if d.execute == nil { + return RunHandle{}, ErrNotRegistered + } + var cfg executeConfig + for _, opt := range opts { + opt(&cfg) + } + defaultID := d.Name + "-" + uuid.NewString() + sdkOpts := cfg.apply(defaultID, d.TaskQueue) + run, err := d.execute(ctx, c, sdkOpts, input) + if err != nil { + return RunHandle{}, translateSDKError("execute", err) + } + return RunHandle{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + raw: run, + }, nil +} + +// GetRun returns a RunHandle for an existing workflow run identified by +// wfID and runID (runID "" = latest). Useful when reattaching to a run +// triggered elsewhere. +func (d *Definition) GetRun(c client.Client, wfID, runID string) RunHandle { + if c == nil { + return RunHandle{WorkflowID: wfID, RunID: runID} + } + run := c.GetWorkflow(context.Background(), wfID, runID) + return RunHandle{WorkflowID: wfID, RunID: runID, raw: run} +} + +// --- Dedup helpers used by builders' Register closures --- + +type registrarKey struct { + worker worker.Worker + typeName string +} + +var ( + registeredWorkflows sync.Map + registeredActivities sync.Map +) + +// RegisterWorkflowOnce registers a workflow on a worker, returning silently +// if the (worker, typeName) pair has already been registered. Used by +// builder packages to make their RegisterAll-style helpers idempotent. +func RegisterWorkflowOnce(w worker.Worker, typeName string, wf any, opts workflow.RegisterOptions) { + key := registrarKey{w, typeName} + if _, loaded := registeredWorkflows.LoadOrStore(key, struct{}{}); loaded { + return + } + w.RegisterWorkflowWithOptions(wf, opts) +} + +// RegisterActivityOnce registers an activity on a worker idempotently. +// Activity name comes from opts.Name; pass empty Name only for typed-function +// activities (rare in this codebase). +func RegisterActivityOnce(w worker.Worker, typeName string, fn any, opts activity.RegisterOptions) { + if typeName == "" { + typeName = opts.Name + } + if typeName == "" { + // Fallback: this should not happen in this codebase but provides safety. + typeName = fmt.Sprintf("%T", fn) + } + key := registrarKey{w, typeName} + if _, loaded := registeredActivities.LoadOrStore(key, struct{}{}); loaded { + return + } + w.RegisterActivityWithOptions(fn, opts) +} diff --git a/temporal/job/definition_integration_test.go b/temporal/job/definition_integration_test.go new file mode 100644 index 0000000..d3455d3 --- /dev/null +++ b/temporal/job/definition_integration_test.go @@ -0,0 +1,150 @@ +//go:build integration + +package job + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" + + "github.com/jasoet/pkg/v2/temporal/testcontainer" +) + +func echoWorkflow(ctx workflow.Context, in string) (string, error) { + var out string + if err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Second, + }), echoActivity, in).Get(ctx, &out); err != nil { + return "", err + } + return out, nil +} + +func echoActivity(_ context.Context, in string) (string, error) { return in, nil } + +func setupTestDef(t *testing.T, c client.Client, w worker.Worker) *Definition { + t.Helper() + d, err := New("echo", "echo-tq", + WithRegister(func(w worker.Worker) { + RegisterWorkflowOnce(w, "echo", echoWorkflow, workflow.RegisterOptions{Name: "echo"}) + RegisterActivityOnce(w, "echoActivity", echoActivity, activity.RegisterOptions{Name: "echoActivity"}) + }), + WithExecute(func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, in any) (client.WorkflowRun, error) { + return c.ExecuteWorkflow(ctx, opts, "echo", in) + }), + WithNewInput(func() any { var s string; return &s }), + ) + require.NoError(t, err) + d.Register(w) + return d +} + +func TestIntegration_Definition_Execute_Describe_History(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tc, c, cleanup, err := testcontainer.Setup(ctx, testcontainer.ClientConfig{Namespace: "default"}, testcontainer.Options{}) + require.NoError(t, err) + defer cleanup() + _ = tc + + w := worker.New(c, "echo-tq", worker.Options{}) + d := setupTestDef(t, c, w) + require.NoError(t, w.Start()) + defer w.Stop() + + run, err := d.Execute(ctx, c, "hello world") + require.NoError(t, err) + assert.True(t, len(run.WorkflowID) > 5, "workflow ID has prefix") + + var got string + require.NoError(t, run.Get(ctx, &got)) + assert.Equal(t, "hello world", got) + + detail, err := d.Describe(ctx, c, run.WorkflowID, run.RunID) + require.NoError(t, err) + assert.Equal(t, StatusCompleted, detail.Status) + assert.Equal(t, "echo", detail.Type) + + hist, err := d.History(ctx, c, run.WorkflowID, run.RunID, HistoryOpts{}) + require.NoError(t, err) + assert.NotEmpty(t, hist.Activities) +} + +func TestIntegration_Definition_Cancel(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tc, c, cleanup, err := testcontainer.Setup(ctx, testcontainer.ClientConfig{Namespace: "default"}, testcontainer.Options{}) + require.NoError(t, err) + defer cleanup() + _ = tc + + longWf := func(ctx workflow.Context, _ string) error { + return workflow.Sleep(ctx, 1*time.Hour) + } + + d, err := New("long", "long-tq", + WithRegister(func(w worker.Worker) { + RegisterWorkflowOnce(w, "long", longWf, workflow.RegisterOptions{Name: "long"}) + }), + WithExecute(func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, in any) (client.WorkflowRun, error) { + return c.ExecuteWorkflow(ctx, opts, "long", in) + }), + WithNewInput(func() any { var s string; return &s }), + ) + require.NoError(t, err) + + w := worker.New(c, "long-tq", worker.Options{}) + d.Register(w) + require.NoError(t, w.Start()) + defer w.Stop() + + run, err := d.Execute(ctx, c, "x") + require.NoError(t, err) + + require.NoError(t, d.Cancel(ctx, c, run.WorkflowID, run.RunID)) + + // Wait a moment for cancellation to propagate + time.Sleep(2 * time.Second) + detail, err := d.Describe(ctx, c, run.WorkflowID, run.RunID) + require.NoError(t, err) + assert.Equal(t, StatusCanceled, detail.Status) +} + +func TestIntegration_Definition_ListRuns_ScopedByName(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tc, c, cleanup, err := testcontainer.Setup(ctx, testcontainer.ClientConfig{Namespace: "default"}, testcontainer.Options{}) + require.NoError(t, err) + defer cleanup() + _ = tc + + w := worker.New(c, "echo-tq", worker.Options{}) + d := setupTestDef(t, c, w) + require.NoError(t, w.Start()) + defer w.Stop() + + // Run twice. + r1, err := d.Execute(ctx, c, "one") + require.NoError(t, err) + require.NoError(t, r1.Get(ctx, new(string))) + r2, err := d.Execute(ctx, c, "two") + require.NoError(t, err) + require.NoError(t, r2.Get(ctx, new(string))) + + // Visibility settles asynchronously. + time.Sleep(2 * time.Second) + + page, err := d.ListRuns(ctx, c, ListOpts{PageSize: 10}) + require.NoError(t, err) + assert.GreaterOrEqual(t, len(page.Runs), 2, "ListRuns scoped by Name prefix returns both runs") +} diff --git a/temporal/job/definition_schedule.go b/temporal/job/definition_schedule.go new file mode 100644 index 0000000..c9fe80e --- /dev/null +++ b/temporal/job/definition_schedule.go @@ -0,0 +1,132 @@ +package job + +import ( + "context" + "fmt" + + "go.temporal.io/sdk/client" +) + +// ApplySchedule creates or updates the Temporal schedule for this Definition. +// Schedule ID equals Definition.Name. If a schedule with that ID already +// exists, it is updated to match the current ScheduleSpec. +func (d *Definition) ApplySchedule(ctx context.Context, c client.Client) error { + if d.Schedule == nil { + return ErrNoSchedule + } + spec, err := d.Schedule.toSDKSpec() + if err != nil { + return fmt.Errorf("schedule: %w", err) + } + sc := c.ScheduleClient() + + // Check existence by trying to describe. + handle := sc.GetHandle(ctx, d.Name) + _, descErr := handle.Describe(ctx) + if descErr == nil { + // Update path + return translateSDKError("schedule-update", handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + return &client.ScheduleUpdate{ + Schedule: &client.Schedule{ + Spec: &spec, + Action: scheduleAction(d), + Policy: &client.SchedulePolicies{ + Overlap: d.Schedule.Overlap.ToSDK(), + }, + State: &client.ScheduleState{ + Paused: d.Schedule.Paused, + Note: d.Schedule.Note, + }, + }, + }, nil + }, + })) + } + + // Create path + _, err = sc.Create(ctx, client.ScheduleOptions{ + ID: d.Name, + Spec: spec, + Action: scheduleAction(d), + Overlap: d.Schedule.Overlap.ToSDK(), + Paused: d.Schedule.Paused, + Note: d.Schedule.Note, + }) + return translateSDKError("schedule-create", err) +} + +func scheduleAction(d *Definition) *client.ScheduleWorkflowAction { + return &client.ScheduleWorkflowAction{ + ID: d.Name + "-scheduled", + Workflow: d.Name, + TaskQueue: d.TaskQueue, + } +} + +// PauseSchedule pauses an existing schedule. +func (d *Definition) PauseSchedule(ctx context.Context, c client.Client, note string) error { + if d.Schedule == nil { + return ErrNoSchedule + } + handle := c.ScheduleClient().GetHandle(ctx, d.Name) + return translateSDKError("schedule-pause", handle.Pause(ctx, client.SchedulePauseOptions{Note: note})) +} + +// ResumeSchedule unpauses an existing schedule. +func (d *Definition) ResumeSchedule(ctx context.Context, c client.Client, note string) error { + if d.Schedule == nil { + return ErrNoSchedule + } + handle := c.ScheduleClient().GetHandle(ctx, d.Name) + return translateSDKError("schedule-resume", handle.Unpause(ctx, client.ScheduleUnpauseOptions{Note: note})) +} + +// TriggerSchedule fires an immediate run of the schedule's action. +func (d *Definition) TriggerSchedule(ctx context.Context, c client.Client) error { + if d.Schedule == nil { + return ErrNoSchedule + } + handle := c.ScheduleClient().GetHandle(ctx, d.Name) + return translateSDKError("schedule-trigger", handle.Trigger(ctx, client.ScheduleTriggerOptions{})) +} + +// DeleteSchedule removes the schedule from Temporal. +func (d *Definition) DeleteSchedule(ctx context.Context, c client.Client) error { + handle := c.ScheduleClient().GetHandle(ctx, d.Name) + return translateSDKError("schedule-delete", handle.Delete(ctx)) +} + +// DescribeSchedule returns the current schedule state. +func (d *Definition) DescribeSchedule(ctx context.Context, c client.Client) (ScheduleDetail, error) { + handle := c.ScheduleClient().GetHandle(ctx, d.Name) + desc, err := handle.Describe(ctx) + if err != nil { + return ScheduleDetail{}, translateSDKError("schedule-describe", err) + } + sum := ScheduleSummary{ + ID: d.Name, + WorkflowType: d.Name, + } + if desc.Schedule.State != nil { + sum.Paused = desc.Schedule.State.Paused + sum.Note = desc.Schedule.State.Note + } + det := ScheduleDetail{ + ScheduleSummary: sum, + } + if d.Schedule != nil { + det.Spec = *d.Schedule + } + if len(desc.Info.NextActionTimes) > 0 { + nt := desc.Info.NextActionTimes[0] + det.NextRunTime = &nt + } + if len(desc.Info.RecentActions) > 0 { + last := desc.Info.RecentActions[len(desc.Info.RecentActions)-1] + lt := last.ActualTime + det.LastRunTime = < + } + // RecentRuns left empty for v1; populating requires extra queries. + return det, nil +} diff --git a/temporal/job/definition_test.go b/temporal/job/definition_test.go new file mode 100644 index 0000000..b60f50e --- /dev/null +++ b/temporal/job/definition_test.go @@ -0,0 +1,139 @@ +package job + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/nexus-rpc/sdk-go/nexus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" +) + +func TestNew_RequiresName(t *testing.T) { + _, err := New("", "tq", + WithRegister(func(worker.Worker) {}), + WithExecute(func(context.Context, client.Client, client.StartWorkflowOptions, any) (client.WorkflowRun, error) { + return nil, nil + }), + WithNewInput(func() any { return nil }), + ) + assert.ErrorIs(t, err, ErrInvalidDefinition) +} + +func TestNew_RequiresTaskQueue(t *testing.T) { + _, err := New("name", "", + WithRegister(func(worker.Worker) {}), + WithExecute(func(context.Context, client.Client, client.StartWorkflowOptions, any) (client.WorkflowRun, error) { + return nil, nil + }), + WithNewInput(func() any { return nil }), + ) + assert.ErrorIs(t, err, ErrInvalidDefinition) +} + +func TestNew_RequiresAllClosures(t *testing.T) { + _, err := New("name", "tq") + assert.ErrorIs(t, err, ErrInvalidDefinition) + + _, err = New("name", "tq", WithRegister(func(worker.Worker) {})) + assert.ErrorIs(t, err, ErrInvalidDefinition) + + _, err = New("name", "tq", + WithRegister(func(worker.Worker) {}), + WithExecute(func(context.Context, client.Client, client.StartWorkflowOptions, any) (client.WorkflowRun, error) { + return nil, nil + }), + ) + assert.ErrorIs(t, err, ErrInvalidDefinition) +} + +func TestNew_ValidScheduleAccepted(t *testing.T) { + d, err := New("name", "tq", + WithRegister(func(worker.Worker) {}), + WithExecute(func(context.Context, client.Client, client.StartWorkflowOptions, any) (client.WorkflowRun, error) { + return nil, nil + }), + WithNewInput(func() any { return nil }), + WithSchedule(&ScheduleSpec{Interval: 1}), + WithDescription("desc"), + WithTags("a", "b"), + ) + require.NoError(t, err) + assert.Equal(t, "name", d.Name) + assert.Equal(t, "tq", d.TaskQueue) + assert.Equal(t, "desc", d.Description) + assert.Equal(t, []string{"a", "b"}, d.Tags) + require.NotNil(t, d.Schedule) +} + +func TestNew_InvalidScheduleRejected(t *testing.T) { + _, err := New("name", "tq", + WithRegister(func(worker.Worker) {}), + WithExecute(func(context.Context, client.Client, client.StartWorkflowOptions, any) (client.WorkflowRun, error) { + return nil, nil + }), + WithNewInput(func() any { return nil }), + WithSchedule(&ScheduleSpec{}), // nothing set + ) + assert.ErrorIs(t, err, ErrInvalidDefinition) +} + +// fakeWorker is a minimal stub implementing worker.Worker — only the two +// register methods are exercised in unit tests. +type fakeWorker struct { + workflowRegistrations int32 + activityRegistrations int32 +} + +func (f *fakeWorker) RegisterWorkflow(_ any) {} +func (f *fakeWorker) RegisterWorkflowWithOptions(_ any, _ workflow.RegisterOptions) { + atomic.AddInt32(&f.workflowRegistrations, 1) +} +func (f *fakeWorker) RegisterDynamicWorkflow(_ any, _ workflow.DynamicRegisterOptions) {} +func (f *fakeWorker) RegisterActivity(_ any) {} +func (f *fakeWorker) RegisterActivityWithOptions(_ any, _ activity.RegisterOptions) { + atomic.AddInt32(&f.activityRegistrations, 1) +} +func (f *fakeWorker) RegisterDynamicActivity(_ any, _ activity.DynamicRegisterOptions) {} +func (f *fakeWorker) RegisterNexusService(_ *nexus.Service) {} +func (f *fakeWorker) Start() error { return nil } +func (f *fakeWorker) Run(_ <-chan interface{}) error { return nil } +func (f *fakeWorker) Stop() {} + +func TestRegisterWorkflowOnce_Deduplicates(t *testing.T) { + w := &fakeWorker{} + RegisterWorkflowOnce(w, "myWfDedup1", func() error { return nil }, workflow.RegisterOptions{Name: "myWfDedup1"}) + RegisterWorkflowOnce(w, "myWfDedup1", func() error { return nil }, workflow.RegisterOptions{Name: "myWfDedup1"}) + RegisterWorkflowOnce(w, "myWfDedup1", func() error { return nil }, workflow.RegisterOptions{Name: "myWfDedup1"}) + assert.Equal(t, int32(1), atomic.LoadInt32(&w.workflowRegistrations)) +} + +func TestRegisterWorkflowOnce_DifferentWorkers(t *testing.T) { + w1 := &fakeWorker{} + w2 := &fakeWorker{} + RegisterWorkflowOnce(w1, "myWfMulti", func() error { return nil }, workflow.RegisterOptions{Name: "myWfMulti"}) + RegisterWorkflowOnce(w2, "myWfMulti", func() error { return nil }, workflow.RegisterOptions{Name: "myWfMulti"}) + assert.Equal(t, int32(1), atomic.LoadInt32(&w1.workflowRegistrations)) + assert.Equal(t, int32(1), atomic.LoadInt32(&w2.workflowRegistrations)) +} + +func TestGetRun(t *testing.T) { + d, err := New("name", "tq", + WithRegister(func(worker.Worker) {}), + WithExecute(func(context.Context, client.Client, client.StartWorkflowOptions, any) (client.WorkflowRun, error) { + return nil, nil + }), + WithNewInput(func() any { return nil }), + ) + require.NoError(t, err) + h := d.GetRun(nil, "wf-id", "run-id") + assert.Equal(t, "wf-id", h.WorkflowID) + assert.Equal(t, "run-id", h.RunID) + // raw is nil because client is nil; Get(ctx, &v) returns nil. + assert.NoError(t, h.Get(context.Background(), nil)) +} diff --git a/temporal/job/definition_workflow.go b/temporal/job/definition_workflow.go new file mode 100644 index 0000000..16f3fc9 --- /dev/null +++ b/temporal/job/definition_workflow.go @@ -0,0 +1,295 @@ +package job + +import ( + "context" + "fmt" + "strings" + "time" + + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" +) + +// Describe returns the current state of one workflow run. runID "" = latest. +func (d *Definition) Describe(ctx context.Context, c client.Client, wfID, runID string) (RunDetail, error) { + resp, err := c.DescribeWorkflowExecution(ctx, wfID, runID) + if err != nil { + return RunDetail{}, translateSDKError("describe", err) + } + info := resp.GetWorkflowExecutionInfo() + if info == nil { + return RunDetail{}, fmt.Errorf("describe: empty info from server") + } + det := RunDetail{ + WorkflowID: info.GetExecution().GetWorkflowId(), + RunID: info.GetExecution().GetRunId(), + Type: info.GetType().GetName(), + TaskQueue: info.GetTaskQueue(), + Status: StatusFromSDK(info.GetStatus()), + StartTime: info.GetStartTime().AsTime(), + HistoryLength: info.GetHistoryLength(), + } + if info.GetCloseTime() != nil { + ct := info.GetCloseTime().AsTime() + det.CloseTime = &ct + det.ExecutionTime = ct.Sub(det.StartTime) + } else { + det.ExecutionTime = time.Since(det.StartTime) + } + det.Memo = decodeMemo(info.GetMemo()) + det.SearchAttributes = decodeSearchAttributes(info.GetSearchAttributes()) + return det, nil +} + +// History returns the activity-event extraction of one run's history. +func (d *Definition) History(ctx context.Context, c client.Client, wfID, runID string, opts HistoryOpts) (RunHistory, error) { + limit := opts.MaxEvents + if limit == 0 { + limit = 500 + } + iter := c.GetWorkflowHistory(ctx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + hist := RunHistory{WorkflowID: wfID, RunID: runID} + scheduled := map[int64]*historypb.HistoryEvent{} // event ID -> scheduled event + count := 0 + for iter.HasNext() { + if count >= limit { + hist.Truncated = true + break + } + ev, err := iter.Next() + if err != nil { + return hist, translateSDKError("history", err) + } + count++ + switch attrs := ev.Attributes.(type) { + case *historypb.HistoryEvent_ActivityTaskScheduledEventAttributes: + scheduled[ev.EventId] = ev + case *historypb.HistoryEvent_ActivityTaskStartedEventAttributes: + sched := scheduled[attrs.ActivityTaskStartedEventAttributes.GetScheduledEventId()] + if sched != nil { + hist.Activities = append(hist.Activities, buildActivityEventFromStarted(sched, ev)) + } + case *historypb.HistoryEvent_ActivityTaskCompletedEventAttributes: + updateLatestRunning(&hist, ev.GetEventTime().AsTime(), ActivityCompleted, payloadToBytes(attrs.ActivityTaskCompletedEventAttributes.GetResult()), "") + case *historypb.HistoryEvent_ActivityTaskFailedEventAttributes: + updateLatestRunning(&hist, ev.GetEventTime().AsTime(), ActivityFailed, nil, attrs.ActivityTaskFailedEventAttributes.GetFailure().GetMessage()) + case *historypb.HistoryEvent_ActivityTaskTimedOutEventAttributes: + updateLatestRunning(&hist, ev.GetEventTime().AsTime(), ActivityTimedOut, nil, attrs.ActivityTaskTimedOutEventAttributes.GetFailure().GetMessage()) + case *historypb.HistoryEvent_ActivityTaskCanceledEventAttributes: + updateLatestRunning(&hist, ev.GetEventTime().AsTime(), ActivityCanceled, nil, "") + } + } + return hist, nil +} + +func buildActivityEventFromStarted(scheduled, started *historypb.HistoryEvent) ActivityEvent { + a := scheduled.GetActivityTaskScheduledEventAttributes() + return ActivityEvent{ + Name: a.GetActivityType().GetName(), + Status: ActivityStarted, + Attempt: started.GetActivityTaskStartedEventAttributes().GetAttempt(), + StartTime: started.GetEventTime().AsTime(), + Input: payloadToBytes(a.GetInput()), + } +} + +// updateLatestRunning closes out the latest ActivityEvent in hist that is still +// in ActivityStarted state. Temporal emits events in monotonic EventID order so +// the latest still-running activity is the one being closed. +func updateLatestRunning(hist *RunHistory, closeTime time.Time, status ActivityStatus, result []byte, errMsg string) { + for i := len(hist.Activities) - 1; i >= 0; i-- { + a := &hist.Activities[i] + if a.Status == ActivityStarted { + a.Status = status + a.CloseTime = closeTime + a.Duration = closeTime.Sub(a.StartTime) + a.Result = result + a.Error = errMsg + return + } + } +} + +func payloadToBytes(p *commonpb.Payloads) []byte { + if p == nil || len(p.GetPayloads()) == 0 { + return nil + } + var out []byte + for _, pl := range p.GetPayloads() { + out = append(out, pl.GetData()...) + } + return out +} + +func decodeMemo(p *commonpb.Memo) map[string]any { + if p == nil || len(p.GetFields()) == 0 { + return nil + } + out := make(map[string]any, len(p.GetFields())) + for k, v := range p.GetFields() { + out[k] = string(v.GetData()) + } + return out +} + +func decodeSearchAttributes(p *commonpb.SearchAttributes) map[string]any { + if p == nil || len(p.GetIndexedFields()) == 0 { + return nil + } + out := make(map[string]any, len(p.GetIndexedFields())) + for k, v := range p.GetIndexedFields() { + out[k] = string(v.GetData()) + } + return out +} + +// Cancel requests cancellation of a workflow run. +func (d *Definition) Cancel(ctx context.Context, c client.Client, wfID, runID string) error { + return translateSDKError("cancel", c.CancelWorkflow(ctx, wfID, runID)) +} + +// Terminate hard-stops a workflow run. +func (d *Definition) Terminate(ctx context.Context, c client.Client, wfID, runID, reason string) error { + return translateSDKError("terminate", c.TerminateWorkflow(ctx, wfID, runID, reason)) +} + +// Signal sends a signal to a workflow run. +func (d *Definition) Signal(ctx context.Context, c client.Client, wfID, runID, signalName string, payload any) error { + return translateSDKError("signal", c.SignalWorkflow(ctx, wfID, runID, signalName, payload)) +} + +// Query invokes a synchronous query against a workflow run. +func (d *Definition) Query(ctx context.Context, c client.Client, wfID, runID, queryType string, args ...any) (any, error) { + resp, err := c.QueryWorkflow(ctx, wfID, runID, queryType, args...) + if err != nil { + return nil, translateSDKError("query", err) + } + var out any + if err := resp.Get(&out); err != nil { + return nil, translateSDKError("query-decode", err) + } + return out, nil +} + +// ListRuns lists workflow executions of this Definition, scoped by +// "WorkflowId STARTS_WITH '-'". +func (d *Definition) ListRuns(ctx context.Context, c client.Client, opts ListOpts) (RunPage, error) { + pageSize := opts.PageSize + if pageSize <= 0 { + pageSize = 100 + } + if pageSize > 1000 { + pageSize = 1000 + } + q := d.scopedQuery(opts.Status, opts.TimeRange) + resp, err := c.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + PageSize: int32(pageSize), + NextPageToken: opts.PageToken, + Query: q, + }) + if err != nil { + return RunPage{}, translateSDKError("list-runs", err) + } + page := RunPage{NextPageToken: resp.GetNextPageToken()} + for _, info := range resp.GetExecutions() { + s := RunSummary{ + WorkflowID: info.GetExecution().GetWorkflowId(), + RunID: info.GetExecution().GetRunId(), + Type: info.GetType().GetName(), + Status: StatusFromSDK(info.GetStatus()), + StartTime: info.GetStartTime().AsTime(), + TaskQueue: info.GetTaskQueue(), + } + if info.GetCloseTime() != nil { + ct := info.GetCloseTime().AsTime() + s.CloseTime = &ct + } + page.Runs = append(page.Runs, s) + } + return page, nil +} + +// Stats returns running/completed-today/failed-today counts scoped to this +// Definition's workflow IDs. +func (d *Definition) Stats(ctx context.Context, c client.Client, opts StatsOpts) (DefinitionStats, error) { + now := time.Now() + loc := opts.Location + if loc == nil { + loc = time.UTC + } + startOfDay := time.Date(now.In(loc).Year(), now.In(loc).Month(), now.In(loc).Day(), 0, 0, 0, 0, loc) + prefix := fmt.Sprintf("WorkflowId STARTS_WITH %q", d.Name+"-") + + countQ := func(q string) (int64, error) { + resp, err := c.CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{Query: q}) + if err != nil { + return 0, translateSDKError("count", err) + } + return resp.GetCount(), nil + } + running, err := countQ(prefix + " AND ExecutionStatus = \"Running\"") + if err != nil { + return DefinitionStats{}, err + } + completed, err := countQ(prefix + fmt.Sprintf(" AND ExecutionStatus = \"Completed\" AND CloseTime >= %q", startOfDay.Format(time.RFC3339))) + if err != nil { + return DefinitionStats{}, err + } + failed, err := countQ(prefix + fmt.Sprintf(" AND ExecutionStatus = \"Failed\" AND CloseTime >= %q", startOfDay.Format(time.RFC3339))) + if err != nil { + return DefinitionStats{}, err + } + return DefinitionStats{Running: running, CompletedToday: completed, FailedToday: failed, AsOf: now}, nil +} + +// scopedQuery builds a visibility query that scopes by WorkflowId prefix and +// optionally filters by status / time range. +func (d *Definition) scopedQuery(statuses []Status, tr *TimeRange) string { + var parts []string + parts = append(parts, fmt.Sprintf("WorkflowId STARTS_WITH %q", d.Name+"-")) + if len(statuses) > 0 { + statusStrs := make([]string, 0, len(statuses)) + for _, s := range statuses { + statusStrs = append(statusStrs, fmt.Sprintf("%q", titleCase(s.String()))) + } + parts = append(parts, fmt.Sprintf("ExecutionStatus IN (%s)", strings.Join(statusStrs, ", "))) + } + if tr != nil { + if !tr.Start.IsZero() { + parts = append(parts, fmt.Sprintf("StartTime >= %q", tr.Start.Format(time.RFC3339))) + } + if !tr.End.IsZero() { + parts = append(parts, fmt.Sprintf("StartTime <= %q", tr.End.Format(time.RFC3339))) + } + } + return strings.Join(parts, " AND ") +} + +// titleCase upper-cases the first rune and converts underscore-separated words +// to CamelCase. Used to map status strings like "continued_as_new" to +// "ContinuedAsNew" for Temporal visibility-query status values. ASCII-only +// since status names are fixed and ASCII. +func titleCase(s string) string { + if s == "" { + return s + } + b := []byte(s) + var out []byte + upperNext := true + for _, c := range b { + if c == '_' { + upperNext = true + continue + } + if upperNext && c >= 'a' && c <= 'z' { + out = append(out, c-('a'-'A')) + upperNext = false + continue + } + out = append(out, c) + } + return string(out) +} diff --git a/temporal/job/doc.go b/temporal/job/doc.go new file mode 100644 index 0000000..17a6731 --- /dev/null +++ b/temporal/job/doc.go @@ -0,0 +1,22 @@ +// Package job provides a type-focused abstraction for one registered Temporal +// workflow. A Definition holds the metadata, lifecycle hooks, and per-job +// operations for a single workflow: register on a worker, execute by name, +// attach a schedule, describe runs, control lifecycle. +// +// The package coexists with pkg/temporal's existing managers (WorkflowManager, +// ScheduleManager). Use a Definition when you have a typed handle to "your" +// workflow; use the namespace-wide managers when you need to inspect +// workflows you didn't register yourself. +// +// Typical use: +// +// def, err := job.New("orders-sync", "sync-orders", +// job.WithRegister(func(w worker.Worker) { /* ... */ }), +// job.WithExecute(func(ctx, c, opts, in) (client.WorkflowRun, error) { /* ... */ }), +// job.WithNewInput(func() any { return &OrdersInput{} }), +// job.WithSchedule(&job.ScheduleSpec{Interval: time.Hour}), +// ) +// def.Register(worker) +// run, _ := def.Execute(ctx, c, &OrdersInput{...}) +// detail, _ := def.Describe(ctx, c, run.WorkflowID, run.RunID) +package job diff --git a/temporal/job/errors.go b/temporal/job/errors.go new file mode 100644 index 0000000..acfcf99 --- /dev/null +++ b/temporal/job/errors.go @@ -0,0 +1,38 @@ +package job + +import ( + "errors" + "fmt" + + "go.temporal.io/api/serviceerror" +) + +var ( + // Lookup + ErrNotFound = errors.New("job: not found") + ErrDuplicateName = errors.New("job: duplicate name") + ErrInvalidDefinition = errors.New("job: invalid definition") + + // Lifecycle + ErrAlreadyClosed = errors.New("job: workflow already closed") + ErrNoSchedule = errors.New("job: no schedule configured") + ErrScheduleNotFound = errors.New("job: schedule not found") + + // Wiring + ErrNotRegistered = errors.New("job: register not configured") +) + +// translateSDKError wraps a Temporal SDK error with a typed sentinel where a +// matching one exists; otherwise wraps with the operation name for context. +// Always preserves the original error chain so callers can errors.As to the +// SDK types when needed. +func translateSDKError(op string, err error) error { + if err == nil { + return nil + } + var notFound *serviceerror.NotFound + if errors.As(err, ¬Found) { + return fmt.Errorf("%s: %w: %w", op, ErrNotFound, err) + } + return fmt.Errorf("%s: %w", op, err) +} diff --git a/temporal/job/errors_test.go b/temporal/job/errors_test.go new file mode 100644 index 0000000..e7ef6ea --- /dev/null +++ b/temporal/job/errors_test.go @@ -0,0 +1,28 @@ +package job + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/api/serviceerror" +) + +func TestTranslateSDKError_NotFound(t *testing.T) { + sdk := serviceerror.NewNotFound("workflow not found") + got := translateSDKError("describe", sdk) + assert.True(t, errors.Is(got, ErrNotFound)) + assert.True(t, errors.Is(got, sdk), "preserves underlying SDK error") +} + +func TestTranslateSDKError_Passthrough(t *testing.T) { + other := errors.New("plain error") + got := translateSDKError("cancel", other) + assert.False(t, errors.Is(got, ErrNotFound)) + assert.True(t, errors.Is(got, other)) +} + +func TestTranslateSDKError_Nil(t *testing.T) { + got := translateSDKError("op", nil) + assert.NoError(t, got) +} diff --git a/temporal/job/options.go b/temporal/job/options.go new file mode 100644 index 0000000..36fdf2c --- /dev/null +++ b/temporal/job/options.go @@ -0,0 +1,109 @@ +package job + +import ( + "time" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" +) + +// TimeRange filters by a start-time inclusive range. +type TimeRange struct { + Start time.Time + End time.Time +} + +// ListOpts configures Definition.ListRuns. +type ListOpts struct { + Status []Status // empty = any + TimeRange *TimeRange // by StartTime + PageSize int // default 100, max 1000 + PageToken []byte +} + +// StatsOpts configures Definition.Stats. +type StatsOpts struct { + TodayOnly bool // default false — set true for "running + closed today" + Location *time.Location // if nil and TodayOnly: UTC; otherwise this zone's calendar day +} + +// HistoryOpts configures Definition.History. +type HistoryOpts struct { + MaxEvents int // default 500 in the method; 0 = no cap (caller takes responsibility) +} + +// ScheduleListOpts configures Registry.ListSchedules (future) and individual +// schedule paging. +type ScheduleListOpts struct { + PageSize int + PageToken []byte +} + +// executeConfig accumulates state across ExecuteOption calls. +type executeConfig struct { + workflowID string + timeout time.Duration + taskTimeout time.Duration + retryPolicy *temporal.RetryPolicy + memo map[string]any +} + +// ExecuteOption customizes a single Definition.Execute call. +type ExecuteOption func(*executeConfig) + +// WithWorkflowID overrides the default ID of "-". +func WithWorkflowID(id string) ExecuteOption { + return func(c *executeConfig) { c.workflowID = id } +} + +// WithTimeout sets WorkflowExecutionTimeout. +func WithTimeout(d time.Duration) ExecuteOption { + return func(c *executeConfig) { c.timeout = d } +} + +// WithTaskTimeout sets WorkflowTaskTimeout. +func WithTaskTimeout(d time.Duration) ExecuteOption { + return func(c *executeConfig) { c.taskTimeout = d } +} + +// WithRetryPolicy sets the workflow-level retry policy. +func WithRetryPolicy(p *temporal.RetryPolicy) ExecuteOption { + return func(c *executeConfig) { c.retryPolicy = p } +} + +// WithMemo attaches a memo to the workflow execution. +func WithMemo(m map[string]any) ExecuteOption { + return func(c *executeConfig) { c.memo = m } +} + +// Note: search attributes are intentionally not exposed here yet. The Temporal +// SDK's legacy map[string]any path is deprecated in favor of TypedSearchAttributes, +// which requires per-attribute schema definitions. Callers that need search +// attributes today can construct their own client.StartWorkflowOptions and +// call client.ExecuteWorkflow directly. A future revision will add typed +// support. + +// apply builds a client.StartWorkflowOptions from defaults + accumulated options. +func (c executeConfig) apply(defaultID, taskQueue string) client.StartWorkflowOptions { + id := c.workflowID + if id == "" { + id = defaultID + } + opts := client.StartWorkflowOptions{ + ID: id, + TaskQueue: taskQueue, + } + if c.timeout > 0 { + opts.WorkflowExecutionTimeout = c.timeout + } + if c.taskTimeout > 0 { + opts.WorkflowTaskTimeout = c.taskTimeout + } + if c.retryPolicy != nil { + opts.RetryPolicy = c.retryPolicy + } + if c.memo != nil { + opts.Memo = c.memo + } + return opts +} diff --git a/temporal/job/registry.go b/temporal/job/registry.go new file mode 100644 index 0000000..ca144d0 --- /dev/null +++ b/temporal/job/registry.go @@ -0,0 +1,105 @@ +package job + +import ( + "context" + "fmt" + "sort" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +// Registry maps logical job names to Definitions. Construction does not +// validate seed Definitions twice — they were validated by New already. +type Registry struct { + defs map[string]*Definition +} + +// NewRegistry creates a Registry, optionally seeded. Duplicates among seeds +// silently use the later value (validate input upstream if that matters). +func NewRegistry(defs ...*Definition) *Registry { + r := &Registry{defs: make(map[string]*Definition, len(defs))} + for _, d := range defs { + if d == nil || d.Name == "" { + continue + } + r.defs[d.Name] = d + } + return r +} + +// Add inserts a Definition. Returns ErrDuplicateName on conflict, +// ErrInvalidDefinition if the Definition is nil or missing required fields. +func (r *Registry) Add(d *Definition) error { + if d == nil { + return fmt.Errorf("%w: nil definition", ErrInvalidDefinition) + } + if d.Name == "" || d.TaskQueue == "" || d.register == nil || d.execute == nil || d.newInput == nil { + return fmt.Errorf("%w: definition fields incomplete", ErrInvalidDefinition) + } + if _, exists := r.defs[d.Name]; exists { + return fmt.Errorf("%w: %s", ErrDuplicateName, d.Name) + } + r.defs[d.Name] = d + return nil +} + +// Get returns the Definition with the given name and a boolean indicating +// whether it was found. +func (r *Registry) Get(name string) (*Definition, bool) { + d, ok := r.defs[name] + return d, ok +} + +// MustGet returns the Definition with the given name. Panics with +// fmt.Errorf("%w: %s", ErrNotFound, name) if absent. +func (r *Registry) MustGet(name string) *Definition { + d, ok := r.defs[name] + if !ok { + panic(fmt.Errorf("%w: %s", ErrNotFound, name)) + } + return d +} + +// List returns all Definitions, sorted by Name. +func (r *Registry) List() []*Definition { + names := r.Names() + out := make([]*Definition, len(names)) + for i, n := range names { + out[i] = r.defs[n] + } + return out +} + +// Names returns all registered names, sorted alphabetically. +func (r *Registry) Names() []string { + out := make([]string, 0, len(r.defs)) + for name := range r.defs { + out = append(out, name) + } + sort.Strings(out) + return out +} + +// RegisterAll registers every Definition on the given worker. Idempotent: +// underlying workflow/activity types are deduplicated via +// RegisterWorkflowOnce / RegisterActivityOnce in builder closures. +func (r *Registry) RegisterAll(w worker.Worker) { + for _, d := range r.List() { + d.Register(w) + } +} + +// ApplySchedules creates or updates Temporal schedules for every Definition +// that has one. Returns the first error encountered (does not roll back). +func (r *Registry) ApplySchedules(ctx context.Context, c client.Client) error { + for _, d := range r.List() { + if d.Schedule == nil { + continue + } + if err := d.ApplySchedule(ctx, c); err != nil { + return fmt.Errorf("apply schedule for %q: %w", d.Name, err) + } + } + return nil +} diff --git a/temporal/job/registry_integration_test.go b/temporal/job/registry_integration_test.go new file mode 100644 index 0000000..5a57248 --- /dev/null +++ b/temporal/job/registry_integration_test.go @@ -0,0 +1,68 @@ +//go:build integration + +package job + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" + + "github.com/jasoet/pkg/v2/temporal/testcontainer" +) + +func TestIntegration_Registry_RegisterAll_Deduplicates(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tc, c, cleanup, err := testcontainer.Setup(ctx, testcontainer.ClientConfig{Namespace: "default"}, testcontainer.Options{}) + require.NoError(t, err) + defer cleanup() + _ = tc + + sharedWf := func(workflow.Context, string) (string, error) { return "ok", nil } + + mk := func(name string) *Definition { + d, err := New(name, "shared-tq", + WithRegister(func(w worker.Worker) { + RegisterWorkflowOnce(w, "shared", sharedWf, workflow.RegisterOptions{Name: "shared"}) + }), + WithExecute(func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, in any) (client.WorkflowRun, error) { + return c.ExecuteWorkflow(ctx, opts, "shared", in) + }), + WithNewInput(func() any { var s string; return &s }), + ) + require.NoError(t, err) + return d + } + + r := NewRegistry(mk("alpha"), mk("beta")) + w := worker.New(c, "shared-tq", worker.Options{}) + r.RegisterAll(w) // would panic on duplicate workflow type without dedup + + require.NoError(t, w.Start()) + defer w.Stop() + + for _, name := range []string{"alpha", "beta"} { + run, err := r.MustGet(name).Execute(ctx, c, "in") + require.NoError(t, err) + var out string + require.NoError(t, run.Get(ctx, &out)) + assert.Equal(t, "ok", out) + } + + time.Sleep(2 * time.Second) + + alphaPage, err := r.MustGet("alpha").ListRuns(ctx, c, ListOpts{}) + require.NoError(t, err) + assert.GreaterOrEqual(t, len(alphaPage.Runs), 1) + for _, run := range alphaPage.Runs { + assert.True(t, strings.HasPrefix(run.WorkflowID, "alpha-"), "expected alpha- prefix, got %s", run.WorkflowID) + } +} diff --git a/temporal/job/registry_test.go b/temporal/job/registry_test.go new file mode 100644 index 0000000..808a489 --- /dev/null +++ b/temporal/job/registry_test.go @@ -0,0 +1,69 @@ +package job + +import ( + "context" + "sort" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +func newTestDef(t *testing.T, name string) *Definition { + t.Helper() + d, err := New(name, "tq-"+name, + WithRegister(func(worker.Worker) {}), + WithExecute(func(context.Context, client.Client, client.StartWorkflowOptions, any) (client.WorkflowRun, error) { + return nil, nil + }), + WithNewInput(func() any { return nil }), + ) + require.NoError(t, err) + return d +} + +func TestRegistry_AddAndGet(t *testing.T) { + r := NewRegistry() + d := newTestDef(t, "alpha") + require.NoError(t, r.Add(d)) + got, ok := r.Get("alpha") + assert.True(t, ok) + assert.Same(t, d, got) +} + +func TestRegistry_AddDuplicate(t *testing.T) { + r := NewRegistry(newTestDef(t, "a")) + err := r.Add(newTestDef(t, "a")) + assert.ErrorIs(t, err, ErrDuplicateName) +} + +func TestRegistry_AddNilOrInvalid(t *testing.T) { + r := NewRegistry() + assert.ErrorIs(t, r.Add(nil), ErrInvalidDefinition) + assert.ErrorIs(t, r.Add(&Definition{}), ErrInvalidDefinition) +} + +func TestRegistry_MustGet_Missing(t *testing.T) { + r := NewRegistry() + assert.Panics(t, func() { _ = r.MustGet("missing") }) +} + +func TestRegistry_List_Sorted(t *testing.T) { + r := NewRegistry(newTestDef(t, "charlie"), newTestDef(t, "alpha"), newTestDef(t, "bravo")) + got := r.Names() + assert.True(t, sort.StringsAreSorted(got)) + assert.Equal(t, []string{"alpha", "bravo", "charlie"}, got) + + list := r.List() + require.Len(t, list, 3) + for i, d := range list { + assert.Equal(t, got[i], d.Name) + } +} + +func TestRegistry_NewWithSeed(t *testing.T) { + r := NewRegistry(newTestDef(t, "a"), newTestDef(t, "b")) + assert.Len(t, r.List(), 2) +} diff --git a/temporal/job/result.go b/temporal/job/result.go new file mode 100644 index 0000000..f54289f --- /dev/null +++ b/temporal/job/result.go @@ -0,0 +1,113 @@ +package job + +import ( + "context" + "time" + + "go.temporal.io/sdk/client" +) + +// RunHandle is a lightweight handle to one workflow run. Returned by +// Definition.Execute and Definition.GetRun. +type RunHandle struct { + WorkflowID string + RunID string + raw client.WorkflowRun +} + +// Get blocks until the workflow completes and unmarshals its result into +// valuePtr (must be a non-nil pointer). Returns the workflow's error if it +// failed. Returns nil if the handle has no underlying run (e.g., constructed +// from GetRun on an unknown ID). +func (h RunHandle) Get(ctx context.Context, valuePtr any) error { + if h.raw == nil { + return nil + } + return h.raw.Get(ctx, valuePtr) +} + +// RunDetail is the full description of one workflow run. +type RunDetail struct { + WorkflowID string + RunID string + Type string + TaskQueue string + Status Status + StartTime time.Time + CloseTime *time.Time // nil if still running + ExecutionTime time.Duration + HistoryLength int64 + Memo map[string]any + SearchAttributes map[string]any +} + +// RunHistory is the activity-event extraction of one run's history, bounded +// by HistoryOpts.MaxEvents. +type RunHistory struct { + WorkflowID string + RunID string + Activities []ActivityEvent + Truncated bool +} + +// ActivityEvent describes one activity attempt within a workflow run. +type ActivityEvent struct { + Name string + Status ActivityStatus + Attempt int32 + StartTime time.Time + CloseTime time.Time + Duration time.Duration + Input []byte // raw payload; caller deserializes + Result []byte // raw payload; nil on failure + Error string // empty on success +} + +// RunPage is one page of ListRuns results. +type RunPage struct { + Runs []RunSummary + NextPageToken []byte +} + +// RunSummary is one row in a list of runs. +type RunSummary struct { + WorkflowID string + RunID string + Type string + Status Status + StartTime time.Time + CloseTime *time.Time + TaskQueue string +} + +// DefinitionStats is per-Definition aggregate counters. +type DefinitionStats struct { + Running int64 + CompletedToday int64 + FailedToday int64 + AsOf time.Time +} + +// ScheduleSummary is a lightweight schedule summary. +type ScheduleSummary struct { + ID string + WorkflowType string + Paused bool + NextRunTime *time.Time + LastRunTime *time.Time + Note string +} + +// ScheduleDetail is the full schedule description. +type ScheduleDetail struct { + ScheduleSummary + Spec ScheduleSpec + RecentRuns []RunSummary +} + +// TaskQueueInfo describes a task queue's pollers and reachability. +type TaskQueueInfo struct { + Name string + WorkerCount int + // Future: PollerDetails, Reachability, etc. +} diff --git a/temporal/job/schedule_integration_test.go b/temporal/job/schedule_integration_test.go new file mode 100644 index 0000000..d1e1a50 --- /dev/null +++ b/temporal/job/schedule_integration_test.go @@ -0,0 +1,76 @@ +//go:build integration + +package job + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" + + "github.com/jasoet/pkg/v2/temporal/testcontainer" +) + +func TestIntegration_Schedule_FullLifecycle(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tc, c, cleanup, err := testcontainer.Setup(ctx, testcontainer.ClientConfig{Namespace: "default"}, testcontainer.Options{}) + require.NoError(t, err) + defer cleanup() + _ = tc + + wf := func(workflow.Context, string) error { return nil } + + d, err := New("sched-test", "sched-tq", + WithRegister(func(w worker.Worker) { + RegisterWorkflowOnce(w, "sched-test", wf, workflow.RegisterOptions{Name: "sched-test"}) + }), + WithExecute(func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, in any) (client.WorkflowRun, error) { + return c.ExecuteWorkflow(ctx, opts, "sched-test", in) + }), + WithNewInput(func() any { var s string; return &s }), + WithSchedule(&ScheduleSpec{Interval: time.Hour, Paused: true, Note: "initial"}), + ) + require.NoError(t, err) + + w := worker.New(c, "sched-tq", worker.Options{}) + d.Register(w) + require.NoError(t, w.Start()) + defer w.Stop() + + // Apply + require.NoError(t, d.ApplySchedule(ctx, c)) + defer d.DeleteSchedule(ctx, c) //nolint:errcheck + + // Describe + desc, err := d.DescribeSchedule(ctx, c) + require.NoError(t, err) + assert.True(t, desc.Paused) + assert.Equal(t, "initial", desc.Note) + + // Resume + require.NoError(t, d.ResumeSchedule(ctx, c, "resumed")) + desc, err = d.DescribeSchedule(ctx, c) + require.NoError(t, err) + assert.False(t, desc.Paused) + + // Pause + require.NoError(t, d.PauseSchedule(ctx, c, "paused again")) + desc, err = d.DescribeSchedule(ctx, c) + require.NoError(t, err) + assert.True(t, desc.Paused) + + // Trigger (action runs once even though paused) + require.NoError(t, d.TriggerSchedule(ctx, c)) + + // Delete + require.NoError(t, d.DeleteSchedule(ctx, c)) + _, err = d.DescribeSchedule(ctx, c) + assert.Error(t, err) +} diff --git a/temporal/job/schedule_spec.go b/temporal/job/schedule_spec.go new file mode 100644 index 0000000..8ebe8a6 --- /dev/null +++ b/temporal/job/schedule_spec.go @@ -0,0 +1,140 @@ +package job + +import ( + "errors" + "time" + + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/client" +) + +// OverlapPolicy controls how the scheduler handles a new trigger when a +// previous run is still in flight. Values mirror Temporal's ScheduleOverlapPolicy. +type OverlapPolicy int + +const ( + OverlapSkip OverlapPolicy = iota // default — drop new trigger if previous still running + OverlapBufferOne // queue one trigger; drop further + OverlapBufferAll // queue all triggers + OverlapCancelOther // cancel running, start new + OverlapTerminateOther // terminate running, start new + OverlapAllowAll // start in parallel +) + +// ToSDK converts the OverlapPolicy to the equivalent Temporal SDK enum value. +func (p OverlapPolicy) ToSDK() enumspb.ScheduleOverlapPolicy { + switch p { + case OverlapBufferOne: + return enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE + case OverlapBufferAll: + return enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ALL + case OverlapCancelOther: + return enumspb.SCHEDULE_OVERLAP_POLICY_CANCEL_OTHER + case OverlapTerminateOther: + return enumspb.SCHEDULE_OVERLAP_POLICY_TERMINATE_OTHER + case OverlapAllowAll: + return enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL + default: + return enumspb.SCHEDULE_OVERLAP_POLICY_SKIP + } +} + +// ScheduleRange mirrors Temporal's ScheduleRange (Start/End/Step int). +type ScheduleRange struct { + Start int + End int + Step int +} + +// CalendarSpec mirrors Temporal's ScheduleCalendarSpec. +type CalendarSpec struct { + Second []ScheduleRange + Minute []ScheduleRange + Hour []ScheduleRange + DayOfMonth []ScheduleRange + Month []ScheduleRange + Year []ScheduleRange + DayOfWeek []ScheduleRange + Comment string +} + +// ScheduleSpec describes when a Definition's workflow should fire automatically. +// Exactly one of Interval / Cron / Calendar must be set. +type ScheduleSpec struct { + Interval time.Duration + Cron string + Calendar []CalendarSpec + + Overlap OverlapPolicy + Jitter time.Duration + Paused bool + Note string + CatchupWindow time.Duration +} + +func (s *ScheduleSpec) validate() error { + if s == nil { + return errors.New("job: ScheduleSpec is nil") + } + set := 0 + if s.Interval > 0 { + set++ + } + if s.Cron != "" { + set++ + } + if len(s.Calendar) > 0 { + set++ + } + if set == 0 { + return errors.New("job: ScheduleSpec requires one of Interval, Cron, or Calendar") + } + if set > 1 { + return errors.New("job: ScheduleSpec must set exactly one of Interval, Cron, or Calendar") + } + return nil +} + +func (s *ScheduleSpec) toSDKSpec() (client.ScheduleSpec, error) { + if err := s.validate(); err != nil { + return client.ScheduleSpec{}, err + } + spec := client.ScheduleSpec{ + Jitter: s.Jitter, + } + switch { + case s.Interval > 0: + spec.Intervals = []client.ScheduleIntervalSpec{{Every: s.Interval}} + case s.Cron != "": + spec.CronExpressions = []string{s.Cron} + case len(s.Calendar) > 0: + for _, c := range s.Calendar { + spec.Calendars = append(spec.Calendars, calendarToSDK(c)) + } + } + return spec, nil +} + +func calendarToSDK(c CalendarSpec) client.ScheduleCalendarSpec { + return client.ScheduleCalendarSpec{ + Second: rangesToSDK(c.Second), + Minute: rangesToSDK(c.Minute), + Hour: rangesToSDK(c.Hour), + DayOfMonth: rangesToSDK(c.DayOfMonth), + Month: rangesToSDK(c.Month), + Year: rangesToSDK(c.Year), + DayOfWeek: rangesToSDK(c.DayOfWeek), + Comment: c.Comment, + } +} + +func rangesToSDK(rs []ScheduleRange) []client.ScheduleRange { + if len(rs) == 0 { + return nil + } + out := make([]client.ScheduleRange, len(rs)) + for i, r := range rs { + out[i] = client.ScheduleRange{Start: r.Start, End: r.End, Step: r.Step} + } + return out +} diff --git a/temporal/job/schedule_spec_test.go b/temporal/job/schedule_spec_test.go new file mode 100644 index 0000000..bc57e15 --- /dev/null +++ b/temporal/job/schedule_spec_test.go @@ -0,0 +1,72 @@ +package job + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestScheduleSpec_Validate(t *testing.T) { + t.Run("interval is valid", func(t *testing.T) { + s := &ScheduleSpec{Interval: time.Hour} + assert.NoError(t, s.validate()) + }) + t.Run("cron is valid", func(t *testing.T) { + s := &ScheduleSpec{Cron: "0 0 * * *"} + assert.NoError(t, s.validate()) + }) + t.Run("calendar is valid", func(t *testing.T) { + s := &ScheduleSpec{Calendar: []CalendarSpec{{Hour: []ScheduleRange{{Start: 0, End: 23, Step: 1}}}}} + assert.NoError(t, s.validate()) + }) + t.Run("nothing set is invalid", func(t *testing.T) { + s := &ScheduleSpec{} + assert.Error(t, s.validate()) + }) + t.Run("two set is invalid", func(t *testing.T) { + s := &ScheduleSpec{Interval: time.Hour, Cron: "0 * * * *"} + assert.Error(t, s.validate()) + }) +} + +func TestScheduleSpec_ToSDKSpec_Interval(t *testing.T) { + s := &ScheduleSpec{Interval: 15 * time.Minute} + sdk, err := s.toSDKSpec() + require.NoError(t, err) + require.Len(t, sdk.Intervals, 1) + assert.Equal(t, 15*time.Minute, sdk.Intervals[0].Every) +} + +func TestScheduleSpec_ToSDKSpec_Cron(t *testing.T) { + s := &ScheduleSpec{Cron: "0 */6 * * *"} + sdk, err := s.toSDKSpec() + require.NoError(t, err) + require.Len(t, sdk.CronExpressions, 1) + assert.Equal(t, "0 */6 * * *", sdk.CronExpressions[0]) +} + +func TestScheduleSpec_ToSDKSpec_Calendar(t *testing.T) { + s := &ScheduleSpec{ + Calendar: []CalendarSpec{{ + Hour: []ScheduleRange{{Start: 9, End: 17, Step: 1}}, + Minute: []ScheduleRange{{Start: 0, End: 0, Step: 1}}, + Comment: "business hours", + }}, + } + sdk, err := s.toSDKSpec() + require.NoError(t, err) + require.Len(t, sdk.Calendars, 1) + assert.Equal(t, "business hours", sdk.Calendars[0].Comment) + assert.Len(t, sdk.Calendars[0].Hour, 1) + assert.Equal(t, 9, sdk.Calendars[0].Hour[0].Start) + assert.Equal(t, 17, sdk.Calendars[0].Hour[0].End) +} + +func TestScheduleSpec_ToSDKSpec_Jitter(t *testing.T) { + s := &ScheduleSpec{Interval: time.Hour, Jitter: 30 * time.Second} + sdk, err := s.toSDKSpec() + require.NoError(t, err) + assert.Equal(t, 30*time.Second, sdk.Jitter) +} diff --git a/temporal/job/status.go b/temporal/job/status.go new file mode 100644 index 0000000..5892909 --- /dev/null +++ b/temporal/job/status.go @@ -0,0 +1,105 @@ +package job + +import ( + enumspb "go.temporal.io/api/enums/v1" +) + +// Status represents a workflow execution status, mirrored from Temporal's +// WorkflowExecutionStatus enum for use without leaking SDK enum types. +type Status int + +const ( + StatusUnknown Status = iota + StatusRunning + StatusCompleted + StatusFailed + StatusCanceled + StatusTerminated + StatusContinuedAsNew + StatusTimedOut +) + +// String returns the lowercase snake_case name of the status. +func (s Status) String() string { + switch s { + case StatusRunning: + return "running" + case StatusCompleted: + return "completed" + case StatusFailed: + return "failed" + case StatusCanceled: + return "canceled" + case StatusTerminated: + return "terminated" + case StatusContinuedAsNew: + return "continued_as_new" + case StatusTimedOut: + return "timed_out" + default: + return "unknown" + } +} + +// IsTerminal reports whether the status represents a closed (finished) workflow. +func (s Status) IsTerminal() bool { + switch s { + case StatusCompleted, StatusFailed, StatusCanceled, StatusTerminated, StatusContinuedAsNew, StatusTimedOut: + return true + default: + return false + } +} + +// StatusFromSDK maps a Temporal SDK WorkflowExecutionStatus to a job.Status. +func StatusFromSDK(s enumspb.WorkflowExecutionStatus) Status { + switch s { + case enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING: + return StatusRunning + case enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED: + return StatusCompleted + case enumspb.WORKFLOW_EXECUTION_STATUS_FAILED: + return StatusFailed + case enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED: + return StatusCanceled + case enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED: + return StatusTerminated + case enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW: + return StatusContinuedAsNew + case enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT: + return StatusTimedOut + default: + return StatusUnknown + } +} + +// ActivityStatus mirrors Temporal's per-activity outcome. +type ActivityStatus int + +const ( + ActivityScheduled ActivityStatus = iota + ActivityStarted + ActivityCompleted + ActivityFailed + ActivityTimedOut + ActivityCanceled +) + +func (s ActivityStatus) String() string { + switch s { + case ActivityScheduled: + return "scheduled" + case ActivityStarted: + return "started" + case ActivityCompleted: + return "completed" + case ActivityFailed: + return "failed" + case ActivityTimedOut: + return "timed_out" + case ActivityCanceled: + return "canceled" + default: + return "unknown" + } +} diff --git a/temporal/job/status_test.go b/temporal/job/status_test.go new file mode 100644 index 0000000..9388e9b --- /dev/null +++ b/temporal/job/status_test.go @@ -0,0 +1,49 @@ +package job + +import ( + "testing" + + "github.com/stretchr/testify/assert" + enumspb "go.temporal.io/api/enums/v1" +) + +func TestStatus_String(t *testing.T) { + cases := map[Status]string{ + StatusUnknown: "unknown", + StatusRunning: "running", + StatusCompleted: "completed", + StatusFailed: "failed", + StatusCanceled: "canceled", + StatusTerminated: "terminated", + StatusContinuedAsNew: "continued_as_new", + StatusTimedOut: "timed_out", + } + for s, want := range cases { + assert.Equal(t, want, s.String(), "Status(%d).String()", s) + } +} + +func TestStatus_IsTerminal(t *testing.T) { + terminal := []Status{StatusCompleted, StatusFailed, StatusCanceled, StatusTerminated, StatusContinuedAsNew, StatusTimedOut} + for _, s := range terminal { + assert.True(t, s.IsTerminal(), "%s should be terminal", s) + } + assert.False(t, StatusRunning.IsTerminal()) + assert.False(t, StatusUnknown.IsTerminal()) +} + +func TestStatusFromSDK(t *testing.T) { + cases := map[enumspb.WorkflowExecutionStatus]Status{ + enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED: StatusUnknown, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING: StatusRunning, + enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED: StatusCompleted, + enumspb.WORKFLOW_EXECUTION_STATUS_FAILED: StatusFailed, + enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED: StatusCanceled, + enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED: StatusTerminated, + enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW: StatusContinuedAsNew, + enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT: StatusTimedOut, + } + for sdk, want := range cases { + assert.Equal(t, want, StatusFromSDK(sdk), "sdk=%v", sdk) + } +}