diff --git a/Dockerfile b/Dockerfile index b5cf7c2..678af07 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,8 +15,8 @@ FROM alpine:latest - # Install ca-certificates, libwebp and vips runtime (with fallback for ARM64) - RUN apk --no-cache add ca-certificates && \ + # Runtime deps: ffmpeg ships ffprobe; libwebp/vips have an ARM64 fallback. + RUN apk --no-cache add ca-certificates ffmpeg && \ (apk add --no-cache libwebp vips || echo "libwebp/vips not available for this platform") # Create a non-root user diff --git a/examples/storage-config.yaml b/examples/storage-config.yaml index c7d77a8..61ace4b 100644 --- a/examples/storage-config.yaml +++ b/examples/storage-config.yaml @@ -63,12 +63,29 @@ profiles: token_ttl_seconds: 1800 # 30 minutes storage_path: "originals/videos/{shard?}/{key_base}" enable_sharding: true - + # Processing configuration (future implementation) proxy_folder: "proxies/videos" formats: ["mp4", "webm"] quality: 80 + trailer: + kind: "video" + allowed_mimes: ["video/mp4", "video/quicktime"] + size_max_bytes: 78643200 # 75MB + multipart_threshold_mb: 15 + part_size_mb: 8 + token_ttl_seconds: 1800 + storage_path: "originals/trailers/{key_base}" + enable_sharding: false + proxy_folder: "proxies/trailers" + formats: ["mp4", "webm"] + quality: 80 + max_duration_seconds: 45 + min_width: 1280 + min_height: 720 + allowed_codecs: ["h264", "hevc"] + default: # Upload configuration kind: "image" diff --git a/internal/config/config.go b/internal/config/config.go index 63c7adf..7703a46 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -69,8 +69,14 @@ type Profile struct { ConvertTo string `yaml:"convert_to,omitempty"` // Processing configuration (videos) - ProxyFolder string `yaml:"proxy_folder,omitempty"` - Formats []string `yaml:"formats,omitempty"` + ProxyFolder string `yaml:"proxy_folder,omitempty"` + Formats []string `yaml:"formats,omitempty"` + MaxDurationSeconds int `yaml:"max_duration_seconds,omitempty"` + MinWidth int `yaml:"min_width,omitempty"` + MinHeight int `yaml:"min_height,omitempty"` + MaxWidth int `yaml:"max_width,omitempty"` + MaxHeight int `yaml:"max_height,omitempty"` + AllowedCodecs []string `yaml:"allowed_codecs,omitempty"` } type StorageConfig struct { diff --git a/internal/probe/probe.go b/internal/probe/probe.go new file mode 100644 index 0000000..c308488 --- /dev/null +++ b/internal/probe/probe.go @@ -0,0 +1,261 @@ +// Package probe runs ffprobe over a presigned GET URL and validates the +// resulting metadata against a profile's constraint fields. +package probe + + +import ( + "context" + "encoding/json" + "fmt" + "os/exec" + "strconv" + "strings" + "time" + + "mediaflow/internal/config" +) + +type Result struct { + Asset AssetInfo `json:"asset"` + Video *Stream `json:"video,omitempty"` + Audio *Stream `json:"audio,omitempty"` + Reasons []Reason `json:"reasons"` + OK bool `json:"ok"` +} + +type AssetInfo struct { + ObjectKey string `json:"object_key"` + SizeBytes int64 `json:"size_bytes"` + Container string `json:"container"` +} + +type Stream struct { + DurationSeconds float64 `json:"duration_seconds,omitempty"` + Width int `json:"width,omitempty"` + Height int `json:"height,omitempty"` + Codec string `json:"codec,omitempty"` + BitrateKbps int `json:"bitrate_kbps,omitempty"` + Framerate float64 `json:"framerate,omitempty"` + Channels int `json:"channels,omitempty"` + SampleRateHz int `json:"sample_rate_hz,omitempty"` +} + +type Reason struct { + Code string `json:"code"` + Limit any `json:"limit,omitempty"` + Actual any `json:"actual,omitempty"` +} + +// ffprobe -print_format json -show_format -show_streams output (subset). +type ffprobeOutput struct { + Format ffprobeFormat `json:"format"` + Streams []ffprobeStream `json:"streams"` +} + +type ffprobeFormat struct { + Duration string `json:"duration"` + Size string `json:"size"` + BitRate string `json:"bit_rate"` + FormatName string `json:"format_name"` +} + +type ffprobeStream struct { + CodecType string `json:"codec_type"` + CodecName string `json:"codec_name"` + Width int `json:"width"` + Height int `json:"height"` + Duration string `json:"duration"` + BitRate string `json:"bit_rate"` + RFrameRate string `json:"r_frame_rate"` + Channels int `json:"channels"` + SampleRate string `json:"sample_rate"` +} + +const probeTimeout = 30 * time.Second + +func Probe(ctx context.Context, presignedGetURL, objectKey string, profile *config.Profile) (*Result, error) { + ctx, cancel := context.WithTimeout(ctx, probeTimeout) + defer cancel() + + cmd := exec.CommandContext(ctx, + "ffprobe", + "-v", "quiet", + "-print_format", "json", + "-show_format", + "-show_streams", + presignedGetURL, + ) + out, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("ffprobe failed: %w", err) + } + + var raw ffprobeOutput + if err := json.Unmarshal(out, &raw); err != nil { + return nil, fmt.Errorf("parse ffprobe output: %w", err) + } + + res := buildResult(raw, objectKey) + res.Reasons = checkConstraints(res, profile) + res.OK = len(res.Reasons) == 0 + return res, nil +} + +func buildResult(raw ffprobeOutput, objectKey string) *Result { + res := &Result{ + Asset: AssetInfo{ + ObjectKey: objectKey, + SizeBytes: parseInt64(raw.Format.Size), + Container: pickContainer(raw.Format.FormatName), + }, + Reasons: []Reason{}, + } + + for _, s := range raw.Streams { + switch s.CodecType { + case "video": + if res.Video != nil { + continue + } + duration := parseFloat(s.Duration) + if duration == 0 { + duration = parseFloat(raw.Format.Duration) + } + res.Video = &Stream{ + DurationSeconds: duration, + Width: s.Width, + Height: s.Height, + Codec: s.CodecName, + BitrateKbps: bitrateKbps(s.BitRate, raw.Format.BitRate), + Framerate: parseFraction(s.RFrameRate), + } + case "audio": + if res.Audio != nil { + continue + } + res.Audio = &Stream{ + Codec: s.CodecName, + BitrateKbps: bitrateKbps(s.BitRate, ""), + Channels: s.Channels, + SampleRateHz: int(parseInt64(s.SampleRate)), + } + } + } + + return res +} + +func checkConstraints(res *Result, profile *config.Profile) []Reason { + reasons := []Reason{} + + if res.Video == nil { + reasons = append(reasons, Reason{Code: "no_video_stream"}) + return reasons + } + + v := res.Video + + if profile.MaxDurationSeconds > 0 && v.DurationSeconds > float64(profile.MaxDurationSeconds) { + reasons = append(reasons, Reason{ + Code: "duration_exceeded", + Limit: profile.MaxDurationSeconds, + Actual: v.DurationSeconds, + }) + } + if profile.MinWidth > 0 && v.Width < profile.MinWidth { + reasons = append(reasons, Reason{ + Code: "width_too_low", + Limit: profile.MinWidth, + Actual: v.Width, + }) + } + if profile.MinHeight > 0 && v.Height < profile.MinHeight { + reasons = append(reasons, Reason{ + Code: "height_too_low", + Limit: profile.MinHeight, + Actual: v.Height, + }) + } + if profile.MaxWidth > 0 && v.Width > profile.MaxWidth { + reasons = append(reasons, Reason{ + Code: "width_too_high", + Limit: profile.MaxWidth, + Actual: v.Width, + }) + } + if profile.MaxHeight > 0 && v.Height > profile.MaxHeight { + reasons = append(reasons, Reason{ + Code: "height_too_high", + Limit: profile.MaxHeight, + Actual: v.Height, + }) + } + if len(profile.AllowedCodecs) > 0 && !contains(profile.AllowedCodecs, v.Codec) { + reasons = append(reasons, Reason{ + Code: "codec_not_allowed", + Limit: profile.AllowedCodecs, + Actual: v.Codec, + }) + } + + return reasons +} + +func parseInt64(s string) int64 { + n, _ := strconv.ParseInt(s, 10, 64) + return n +} + +func parseFloat(s string) float64 { + f, _ := strconv.ParseFloat(s, 64) + return f +} + +// ffprobe emits framerate as "30000/1001"; evaluate the fraction. +func parseFraction(s string) float64 { + if s == "" { + return 0 + } + parts := strings.SplitN(s, "/", 2) + if len(parts) == 1 { + return parseFloat(parts[0]) + } + num := parseFloat(parts[0]) + den := parseFloat(parts[1]) + if den == 0 { + return 0 + } + return num / den +} + +// ffprobe bit_rate fields are bps strings; convert to kbps with format-level fallback. +func bitrateKbps(stream, format string) int { + bps := parseInt64(stream) + if bps == 0 { + bps = parseInt64(format) + } + if bps == 0 { + return 0 + } + return int(bps / 1000) +} + +// ffprobe emits comma-joined container names like "mov,mp4,m4a,3gp,3g2,mj2"; take the first. +func pickContainer(formatName string) string { + if formatName == "" { + return "" + } + if i := strings.Index(formatName, ","); i >= 0 { + return formatName[:i] + } + return formatName +} + +func contains(haystack []string, needle string) bool { + for _, s := range haystack { + if s == needle { + return true + } + } + return false +} diff --git a/internal/probe/probe_test.go b/internal/probe/probe_test.go new file mode 100644 index 0000000..43d249d --- /dev/null +++ b/internal/probe/probe_test.go @@ -0,0 +1,112 @@ +package probe + +import ( + "testing" + + "mediaflow/internal/config" +) + +func TestCheckConstraints(t *testing.T) { + shopTrailer := &config.Profile{ + MaxDurationSeconds: 45, + MinWidth: 1280, + MinHeight: 720, + AllowedCodecs: []string{"h264", "hevc"}, + } + + cases := []struct { + name string + profile *config.Profile + video *Stream + wantOK bool + wantCodes []string + }{ + { + name: "passes shop-trailer", + profile: shopTrailer, + video: &Stream{DurationSeconds: 30, Width: 1920, Height: 1080, Codec: "h264"}, + wantOK: true, + }, + { + name: "duration over limit", + profile: shopTrailer, + video: &Stream{DurationSeconds: 67.4, Width: 1920, Height: 1080, Codec: "h264"}, + wantOK: false, + wantCodes: []string{"duration_exceeded"}, + }, + { + name: "below min dimensions", + profile: shopTrailer, + video: &Stream{DurationSeconds: 30, Width: 854, Height: 480, Codec: "h264"}, + wantOK: false, + wantCodes: []string{"width_too_low", "height_too_low"}, + }, + { + name: "disallowed codec", + profile: shopTrailer, + video: &Stream{DurationSeconds: 30, Width: 1920, Height: 1080, Codec: "vp9"}, + wantOK: false, + wantCodes: []string{"codec_not_allowed"}, + }, + { + name: "unset constraints permit anything", + profile: &config.Profile{}, + video: &Stream{DurationSeconds: 9999, Width: 16, Height: 16, Codec: "wmv3"}, + wantOK: true, + }, + { + name: "no video stream", + profile: shopTrailer, + video: nil, + wantOK: false, + wantCodes: []string{"no_video_stream"}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + res := &Result{Video: tc.video} + reasons := checkConstraints(res, tc.profile) + ok := len(reasons) == 0 + if ok != tc.wantOK { + t.Fatalf("ok=%v want=%v reasons=%+v", ok, tc.wantOK, reasons) + } + if len(reasons) != len(tc.wantCodes) { + t.Fatalf("got %d reasons want %d: %+v", len(reasons), len(tc.wantCodes), reasons) + } + for i, code := range tc.wantCodes { + if reasons[i].Code != code { + t.Errorf("reason[%d].Code=%q want %q", i, reasons[i].Code, code) + } + } + }) + } +} + +func TestParseFraction(t *testing.T) { + cases := map[string]float64{ + "": 0, + "30": 30, + "30000/1001": 30000.0 / 1001.0, + "30/0": 0, + } + for in, want := range cases { + got := parseFraction(in) + if got != want { + t.Errorf("parseFraction(%q) = %v want %v", in, got, want) + } + } +} + +func TestPickContainer(t *testing.T) { + cases := map[string]string{ + "": "", + "mp4": "mp4", + "mov,mp4,m4a,3gp,3g2,mj2": "mov", + } + for in, want := range cases { + if got := pickContainer(in); got != want { + t.Errorf("pickContainer(%q) = %q want %q", in, got, want) + } + } +} diff --git a/internal/s3/client.go b/internal/s3/client.go index 011be1c..4e1e189 100644 --- a/internal/s3/client.go +++ b/internal/s3/client.go @@ -114,6 +114,27 @@ func (c *Client) PresignPutObject(ctx context.Context, key string, expires time. return request.URL, nil } +func (c *Client) PresignGetObject(ctx context.Context, key string, expires time.Duration) (string, error) { + request, err := c.presigner.PresignGetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + }, func(opts *s3.PresignOptions) { + opts.Expires = expires + }) + if err != nil { + return "", err + } + return request.URL, nil +} + +func (c *Client) HeadObject(ctx context.Context, key string) error { + _, err := c.s3Client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + }) + return err +} + // CreateMultipartUpload creates a multipart upload and returns the upload ID func (c *Client) CreateMultipartUpload(ctx context.Context, key string, headers map[string]string) (string, error) { input := &s3.CreateMultipartUploadInput{ diff --git a/internal/upload/handlers.go b/internal/upload/handlers.go index 7483729..620271a 100644 --- a/internal/upload/handlers.go +++ b/internal/upload/handlers.go @@ -6,8 +6,10 @@ import ( "fmt" "net/http" "strings" + "time" "mediaflow/internal/config" + "mediaflow/internal/probe" ) type Handler struct { @@ -83,7 +85,7 @@ func (h *Handler) HandlePresign(w http.ResponseWriter, r *http.Request) { scheme = "https" } baseURL := fmt.Sprintf("%s://%s", scheme, r.Host) - + // Generate presigned upload presignResp, err := h.uploadService.PresignUpload(h.ctx, &req, profile, baseURL) if err != nil { @@ -121,7 +123,7 @@ func (h *Handler) HandleCompleteMultipart(w http.ResponseWriter, r *http.Request h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid URL format", "Expected /v1/uploads/{object_key}/complete/{upload_id}") return } - + objectKey := parts[0] uploadID := parts[1] @@ -167,7 +169,7 @@ func (h *Handler) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) { h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid URL format", "Expected /v1/uploads/{object_key}/abort/{upload_id}") return } - + objectKey := parts[0] uploadID := parts[1] @@ -186,6 +188,19 @@ func (h *Handler) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(response) } +// RouteAssets dispatches /v1/assets/{profile}/{key_base}[/probe] to the right +// handler based on method + suffix. +func (h *Handler) RouteAssets(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/probe"): + h.HandleProbeAsset(w, r) + case r.Method == http.MethodDelete: + h.HandleDeleteAsset(w, r) + default: + h.writeError(w, http.StatusMethodNotAllowed, ErrBadRequest, "Method not allowed", "") + } +} + // HandleDeleteAsset handles DELETE /v1/assets/{profile}/{key_base} // Deletes the original file and all generated thumbnails for an asset. func (h *Handler) HandleDeleteAsset(w http.ResponseWriter, r *http.Request) { @@ -194,17 +209,12 @@ func (h *Handler) HandleDeleteAsset(w http.ResponseWriter, r *http.Request) { return } - // Extract profile and key_base from URL path - path := strings.TrimPrefix(r.URL.Path, "/v1/assets/") - slashIdx := strings.Index(path, "/") - if slashIdx < 1 || slashIdx == len(path)-1 { + profileName, keyBase, ok := parseAssetPath(r.URL.Path, "") + if !ok { h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid URL format", "Expected /v1/assets/{profile}/{key_base}") return } - profileName := path[:slashIdx] - keyBase := path[slashIdx+1:] - // Look up profile config profile := h.storageConfig.GetProfile(profileName) if profile == nil { @@ -223,14 +233,80 @@ func (h *Handler) HandleDeleteAsset(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) response := map[string]any{ - "status": "deleted", - "profile": profileName, - "key_base": keyBase, + "status": "deleted", + "profile": profileName, + "key_base": keyBase, "objects_deleted": deleted, } _ = json.NewEncoder(w).Encode(response) } +// HandleProbeAsset handles POST /v1/assets/{profile}/{key_base}/probe. +// Returns 200 for both pass and fail; `ok` is the gate, not the status code. +func (h *Handler) HandleProbeAsset(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + h.writeError(w, http.StatusMethodNotAllowed, ErrBadRequest, "Method not allowed", "Use POST") + return + } + + profileName, keyBase, ok := parseAssetPath(r.URL.Path, "/probe") + if !ok { + h.writeError(w, http.StatusBadRequest, ErrBadRequest, "Invalid URL format", "Expected /v1/assets/{profile}/{key_base}/probe") + return + } + + profile := h.storageConfig.GetProfile(profileName) + if profile == nil { + h.writeError(w, http.StatusNotFound, ErrBadRequest, fmt.Sprintf("Unknown profile: %s", profileName), "") + return + } + if profile.Kind != "video" { + h.writeError(w, http.StatusUnprocessableEntity, ErrBadRequest, "Probe requires kind=video", profileName) + return + } + + objectKey := h.uploadService.ResolveAssetKey(profile, keyBase) + + if err := h.uploadService.AssetExists(r.Context(), objectKey); err != nil { + h.writeError(w, http.StatusNotFound, ErrBadRequest, "Asset not found", objectKey) + return + } + + presignURL, err := h.uploadService.PresignGet(r.Context(), objectKey, 120*time.Second) + if err != nil { + fmt.Printf("Probe presign error: %v\n", err) + h.writeError(w, http.StatusInternalServerError, ErrBadRequest, "Failed to presign GET", err.Error()) + return + } + + result, err := probe.Probe(r.Context(), presignURL, objectKey, profile) + if err != nil { + fmt.Printf("Probe error key=%s: %v\n", objectKey, err) + h.writeError(w, http.StatusBadGateway, ErrUpstream, "Probe failed", err.Error()) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(result) +} + +// parseAssetPath extracts {profile} and {key_base} from /v1/assets/{profile}/{key_base}{suffix}. +func parseAssetPath(urlPath, suffix string) (profile, keyBase string, ok bool) { + path := strings.TrimPrefix(urlPath, "/v1/assets/") + if suffix != "" { + if !strings.HasSuffix(path, suffix) { + return "", "", false + } + path = strings.TrimSuffix(path, suffix) + } + slashIdx := strings.Index(path, "/") + if slashIdx < 1 || slashIdx == len(path)-1 { + return "", "", false + } + return path[:slashIdx], path[slashIdx+1:], true +} + // writeError writes a standardized error response func (h *Handler) writeError(w http.ResponseWriter, statusCode int, code, message, hint string) { errorResp := ErrorResponse{ diff --git a/internal/upload/interfaces.go b/internal/upload/interfaces.go index c0ba5df..ed0d9c3 100644 --- a/internal/upload/interfaces.go +++ b/internal/upload/interfaces.go @@ -11,9 +11,11 @@ import ( type S3Client interface { CreateMultipartUpload(ctx context.Context, key string, headers map[string]string) (string, error) PresignPutObject(ctx context.Context, key string, expires time.Duration, headers map[string]string) (string, error) + PresignGetObject(ctx context.Context, key string, expires time.Duration) (string, error) PresignUploadPart(ctx context.Context, key, uploadID string, partNumber int32, expires time.Duration) (string, error) CompleteMultipartUpload(ctx context.Context, key, uploadID string, parts []s3.PartInfo) error AbortMultipartUpload(ctx context.Context, key, uploadID string) error DeleteObject(ctx context.Context, key string) error + HeadObject(ctx context.Context, key string) error ListByPrefix(ctx context.Context, prefix string) ([]string, error) } \ No newline at end of file diff --git a/internal/upload/service.go b/internal/upload/service.go index b20e54f..a0ceb78 100644 --- a/internal/upload/service.go +++ b/internal/upload/service.go @@ -236,15 +236,26 @@ func (s *Service) AbortMultipartUpload(ctx context.Context, objectKey, uploadID return s.s3Client.AbortMultipartUpload(ctx, objectKey, uploadID) } -// DeleteAsset deletes an asset's original file and all generated thumbnails from R2. -// It resolves the storage paths from the profile config, handling sharding if enabled. -func (s *Service) DeleteAsset(ctx context.Context, profile *config.Profile, keyBase string) (int, error) { - // Build the original object key (same logic as upload) +func (s *Service) ResolveAssetKey(profile *config.Profile, keyBase string) string { shard := "" if profile.EnableSharding { shard = GenerateShard(keyBase) } - originalKey := s.buildObjectKey(profile.StoragePath, keyBase, "", shard) + return s.buildObjectKey(profile.StoragePath, keyBase, "", shard) +} + +func (s *Service) PresignGet(ctx context.Context, objectKey string, ttl time.Duration) (string, error) { + return s.s3Client.PresignGetObject(ctx, objectKey, ttl) +} + +func (s *Service) AssetExists(ctx context.Context, objectKey string) error { + return s.s3Client.HeadObject(ctx, objectKey) +} + +// DeleteAsset deletes an asset's original file and all generated thumbnails from R2. +// It resolves the storage paths from the profile config, handling sharding if enabled. +func (s *Service) DeleteAsset(ctx context.Context, profile *config.Profile, keyBase string) (int, error) { + originalKey := s.ResolveAssetKey(profile, keyBase) deleted := 0 diff --git a/internal/upload/service_test.go b/internal/upload/service_test.go index 1819c1e..6a7cd38 100644 --- a/internal/upload/service_test.go +++ b/internal/upload/service_test.go @@ -62,6 +62,14 @@ func (m *MockS3Client) ListByPrefix(ctx context.Context, prefix string) ([]strin return nil, nil } +func (m *MockS3Client) PresignGetObject(ctx context.Context, key string, expires time.Duration) (string, error) { + return "https://test.s3.amazonaws.com/bucket/" + key, nil +} + +func (m *MockS3Client) HeadObject(ctx context.Context, key string) error { + return nil +} + func TestGenerateShard(t *testing.T) { tests := []struct { keyBase string diff --git a/internal/upload/types.go b/internal/upload/types.go index 7d24215..d8a0ac0 100644 --- a/internal/upload/types.go +++ b/internal/upload/types.go @@ -111,4 +111,5 @@ const ( ErrStorageDenied = "storage_denied" ErrBadRequest = "bad_request" ErrRateLimited = "rate_limited" + ErrUpstream = "upstream_error" ) \ No newline at end of file diff --git a/main.go b/main.go index b59f272..0db4de2 100644 --- a/main.go +++ b/main.go @@ -69,8 +69,8 @@ func main() { } }) - // Asset deletion (auth required) - mux.Handle("/v1/assets/", authMiddleware(http.HandlerFunc(uploadHandler.HandleDeleteAsset))) + // Asset operations (auth required) + mux.Handle("/v1/assets/", authMiddleware(http.HandlerFunc(uploadHandler.RouteAssets))) // Health check mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {