Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions temporal/job/definition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package job

import (
"context"
"fmt"
"sync"

"github.com/google/uuid"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

// Definition is a type-focused description of one registered Temporal workflow.
// All per-job operations hang off the type as methods.
type Definition struct {
Name string
TaskQueue string
Description string
Tags []string
Schedule *ScheduleSpec

// Private wiring set only by New via Option closures.
register func(worker.Worker)
execute func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, input any) (client.WorkflowRun, error)
newInput func() any
}

// Option configures a Definition during construction.
type Option func(*Definition)

// WithRegister sets the worker-registration closure.
func WithRegister(fn func(worker.Worker)) Option {
return func(d *Definition) { d.register = fn }
}

// WithExecute sets the workflow-execution closure. The closure receives a
// pre-built client.StartWorkflowOptions (ID + TaskQueue + caller overrides)
// and the typed input value.
func WithExecute(fn func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, input any) (client.WorkflowRun, error)) Option {
return func(d *Definition) { d.execute = fn }
}

// WithNewInput sets the factory that returns a typed zero value of the
// workflow input. Callers fill the value before calling Execute.
func WithNewInput(fn func() any) Option {
return func(d *Definition) { d.newInput = fn }
}

// WithSchedule attaches an optional schedule specification.
func WithSchedule(spec *ScheduleSpec) Option {
return func(d *Definition) { d.Schedule = spec }
}

// WithDescription attaches a human-readable description.
func WithDescription(desc string) Option {
return func(d *Definition) { d.Description = desc }
}

// WithTags attaches user-defined tags.
func WithTags(tags ...string) Option {
return func(d *Definition) { d.Tags = tags }
}

// New constructs a Definition. Validates name, task queue, all closures, and
// the optional schedule. Returns ErrInvalidDefinition if anything is missing
// or inconsistent.
func New(name, taskQueue string, opts ...Option) (*Definition, error) {
if name == "" {
return nil, fmt.Errorf("%w: name required", ErrInvalidDefinition)
}
if taskQueue == "" {
return nil, fmt.Errorf("%w: task queue required", ErrInvalidDefinition)
}
d := &Definition{Name: name, TaskQueue: taskQueue}
for _, opt := range opts {
opt(d)
}
if d.register == nil || d.execute == nil || d.newInput == nil {
return nil, fmt.Errorf("%w: WithRegister, WithExecute, and WithNewInput are all required", ErrInvalidDefinition)
}
if d.Schedule != nil {
if err := d.Schedule.validate(); err != nil {
return nil, fmt.Errorf("%w: %s", ErrInvalidDefinition, err)
}
}
return d, nil
}

// NewInput returns a fresh typed zero value for this Definition's workflow
// input. Callers fill it before calling Execute (e.g., via json.Unmarshal).
func (d *Definition) NewInput() any {
return d.newInput()
}

// Register wires the workflow and its activities onto a worker. Safe to call
// concurrently and multiple times — the builder-supplied register closure is
// expected to use RegisterWorkflowOnce / RegisterActivityOnce for idempotency
// when the underlying workflow type may be shared across Definitions.
func (d *Definition) Register(w worker.Worker) {
if d.register == nil {
return
}
d.register(w)
}

// Execute starts a workflow run. The workflow ID defaults to "<Name>-<uuid>"
// unless overridden via WithWorkflowID(...).
func (d *Definition) Execute(ctx context.Context, c client.Client, input any, opts ...ExecuteOption) (RunHandle, error) {
if d.execute == nil {
return RunHandle{}, ErrNotRegistered
}
var cfg executeConfig
for _, opt := range opts {
opt(&cfg)
}
defaultID := d.Name + "-" + uuid.NewString()
sdkOpts := cfg.apply(defaultID, d.TaskQueue)
run, err := d.execute(ctx, c, sdkOpts, input)
if err != nil {
return RunHandle{}, translateSDKError("execute", err)
}
return RunHandle{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
raw: run,
}, nil
}

// GetRun returns a RunHandle for an existing workflow run identified by
// wfID and runID (runID "" = latest). Useful when reattaching to a run
// triggered elsewhere.
func (d *Definition) GetRun(c client.Client, wfID, runID string) RunHandle {
if c == nil {
return RunHandle{WorkflowID: wfID, RunID: runID}
}
run := c.GetWorkflow(context.Background(), wfID, runID)
return RunHandle{WorkflowID: wfID, RunID: runID, raw: run}
}

// --- Dedup helpers used by builders' Register closures ---

type registrarKey struct {
worker worker.Worker
typeName string
}

var (
registeredWorkflows sync.Map
registeredActivities sync.Map
)

// RegisterWorkflowOnce registers a workflow on a worker, returning silently
// if the (worker, typeName) pair has already been registered. Used by
// builder packages to make their RegisterAll-style helpers idempotent.
func RegisterWorkflowOnce(w worker.Worker, typeName string, wf any, opts workflow.RegisterOptions) {
key := registrarKey{w, typeName}
if _, loaded := registeredWorkflows.LoadOrStore(key, struct{}{}); loaded {
return
}
w.RegisterWorkflowWithOptions(wf, opts)
}

// RegisterActivityOnce registers an activity on a worker idempotently.
// Activity name comes from opts.Name; pass empty Name only for typed-function
// activities (rare in this codebase).
func RegisterActivityOnce(w worker.Worker, typeName string, fn any, opts activity.RegisterOptions) {
if typeName == "" {
typeName = opts.Name
}
if typeName == "" {
// Fallback: this should not happen in this codebase but provides safety.
typeName = fmt.Sprintf("%T", fn)
}
key := registrarKey{w, typeName}
if _, loaded := registeredActivities.LoadOrStore(key, struct{}{}); loaded {
return
}
w.RegisterActivityWithOptions(fn, opts)
}
150 changes: 150 additions & 0 deletions temporal/job/definition_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//go:build integration

package job

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"github.com/jasoet/pkg/v2/temporal/testcontainer"
)

func echoWorkflow(ctx workflow.Context, in string) (string, error) {
var out string
if err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Second,
}), echoActivity, in).Get(ctx, &out); err != nil {
return "", err
}
return out, nil
}

func echoActivity(_ context.Context, in string) (string, error) { return in, nil }

func setupTestDef(t *testing.T, c client.Client, w worker.Worker) *Definition {
t.Helper()
d, err := New("echo", "echo-tq",
WithRegister(func(w worker.Worker) {
RegisterWorkflowOnce(w, "echo", echoWorkflow, workflow.RegisterOptions{Name: "echo"})
RegisterActivityOnce(w, "echoActivity", echoActivity, activity.RegisterOptions{Name: "echoActivity"})
}),
WithExecute(func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, in any) (client.WorkflowRun, error) {
return c.ExecuteWorkflow(ctx, opts, "echo", in)
}),
WithNewInput(func() any { var s string; return &s }),
)
require.NoError(t, err)
d.Register(w)
return d
}

func TestIntegration_Definition_Execute_Describe_History(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel()

tc, c, cleanup, err := testcontainer.Setup(ctx, testcontainer.ClientConfig{Namespace: "default"}, testcontainer.Options{})
require.NoError(t, err)
defer cleanup()
_ = tc

w := worker.New(c, "echo-tq", worker.Options{})
d := setupTestDef(t, c, w)
require.NoError(t, w.Start())
defer w.Stop()

run, err := d.Execute(ctx, c, "hello world")
require.NoError(t, err)
assert.True(t, len(run.WorkflowID) > 5, "workflow ID has prefix")

var got string
require.NoError(t, run.Get(ctx, &got))
assert.Equal(t, "hello world", got)

detail, err := d.Describe(ctx, c, run.WorkflowID, run.RunID)
require.NoError(t, err)
assert.Equal(t, StatusCompleted, detail.Status)
assert.Equal(t, "echo", detail.Type)

hist, err := d.History(ctx, c, run.WorkflowID, run.RunID, HistoryOpts{})
require.NoError(t, err)
assert.NotEmpty(t, hist.Activities)
}

func TestIntegration_Definition_Cancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel()

tc, c, cleanup, err := testcontainer.Setup(ctx, testcontainer.ClientConfig{Namespace: "default"}, testcontainer.Options{})
require.NoError(t, err)
defer cleanup()
_ = tc

longWf := func(ctx workflow.Context, _ string) error {
return workflow.Sleep(ctx, 1*time.Hour)
}

d, err := New("long", "long-tq",
WithRegister(func(w worker.Worker) {
RegisterWorkflowOnce(w, "long", longWf, workflow.RegisterOptions{Name: "long"})
}),
WithExecute(func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, in any) (client.WorkflowRun, error) {
return c.ExecuteWorkflow(ctx, opts, "long", in)
}),
WithNewInput(func() any { var s string; return &s }),
)
require.NoError(t, err)

w := worker.New(c, "long-tq", worker.Options{})
d.Register(w)
require.NoError(t, w.Start())
defer w.Stop()

run, err := d.Execute(ctx, c, "x")
require.NoError(t, err)

require.NoError(t, d.Cancel(ctx, c, run.WorkflowID, run.RunID))

// Wait a moment for cancellation to propagate
time.Sleep(2 * time.Second)
detail, err := d.Describe(ctx, c, run.WorkflowID, run.RunID)
require.NoError(t, err)
assert.Equal(t, StatusCanceled, detail.Status)
}

func TestIntegration_Definition_ListRuns_ScopedByName(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel()

tc, c, cleanup, err := testcontainer.Setup(ctx, testcontainer.ClientConfig{Namespace: "default"}, testcontainer.Options{})
require.NoError(t, err)
defer cleanup()
_ = tc

w := worker.New(c, "echo-tq", worker.Options{})
d := setupTestDef(t, c, w)
require.NoError(t, w.Start())
defer w.Stop()

// Run twice.
r1, err := d.Execute(ctx, c, "one")
require.NoError(t, err)
require.NoError(t, r1.Get(ctx, new(string)))
r2, err := d.Execute(ctx, c, "two")
require.NoError(t, err)
require.NoError(t, r2.Get(ctx, new(string)))

// Visibility settles asynchronously.
time.Sleep(2 * time.Second)

page, err := d.ListRuns(ctx, c, ListOpts{PageSize: 10})
require.NoError(t, err)
assert.GreaterOrEqual(t, len(page.Runs), 2, "ListRuns scoped by Name prefix returns both runs")
}
Loading
Loading