diff --git a/runtime/exec/environment.go b/runtime/exec/environment.go index de564a1cf3..a3f495ed04 100644 --- a/runtime/exec/environment.go +++ b/runtime/exec/environment.go @@ -172,6 +172,13 @@ func (c *closePuller) Pull(done bool) (sbuf.Batch, error) { } func (e *Environment) VectorOpen(ctx context.Context, sctx *super.Context, path, format string, p sbuf.Pushdown, concurrentReaders int) (VectorConcurrentPuller, error) { + if format != "csup" && format != "fjson" && format != "parquet" { + sbufPuller, err := e.Open(ctx, sctx, path, format, p) + if err != nil { + return nil, err + } + return sbuf.NewDematerializer(sctx, sbufPuller), nil + } if path == "-" { path = "stdio:stdin" } @@ -192,13 +199,32 @@ func (e *Environment) VectorOpen(ctx context.Context, sctx *super.Context, path, case "fjson": puller = fjsonio.NewVectorReader(ctx, sctx, reader, p, concurrentReaders) default: - var sbufPuller sbuf.Puller - sbufPuller, err = e.Open(ctx, sctx, path, format, p) - puller = sbuf.NewDematerializer(sctx, sbufPuller) + panic(format) } if err != nil { reader.Close() - return nil, err + return nil, fmt.Errorf("%s: %w", path, err) + } + return &errorPrefixConcurrentPuller{puller, path}, nil +} + +type errorPrefixConcurrentPuller struct { + VectorConcurrentPuller + prefix string +} + +func (e *errorPrefixConcurrentPuller) ConcurrentPull(done bool, id int) (vector.Any, error) { + vec, err := e.VectorConcurrentPuller.ConcurrentPull(done, id) + if err != nil { + err = fmt.Errorf("%s: %w", e.prefix, err) + } + return vec, err +} + +func (e *errorPrefixConcurrentPuller) Pull(done bool) (vector.Any, error) { + vec, err := e.VectorConcurrentPuller.Pull(done) + if err != nil { + err = fmt.Errorf("%s: %w", e.prefix, err) } - return puller, nil + return vec, err } diff --git a/sio/csupio/vectorreader.go b/sio/csupio/vectorreader.go index 8950b80a78..83e0da7a72 100644 --- a/sio/csupio/vectorreader.go +++ b/sio/csupio/vectorreader.go @@ -37,6 +37,10 @@ func NewVectorReader(ctx context.Context, sctx *super.Context, r io.Reader, p sb if !ok { return nil, errors.New("Super Columnar requires a seekable input") } + var buf [1]byte + if _, err := ra.ReadAt(buf[:], 0); err != nil { + return nil, errors.New("Super Columnar requires a seekable input") + } var metaFilters []*metafilter if p != nil { filter, _, err := p.MetaFilter() diff --git a/sio/csupio/ztests/stdin-error.yaml b/sio/csupio/ztests/stdin-error.yaml index 296f3f0949..7ce29311dd 100644 --- a/sio/csupio/ztests/stdin-error.yaml +++ b/sio/csupio/ztests/stdin-error.yaml @@ -2,6 +2,8 @@ script: | super -f csup -o t.csup in.sup ! cat t.csup | super -i csup - +vector: true + inputs: - name: in.sup data: | diff --git a/sio/fjsonio/ztests/parser-error.yaml b/sio/fjsonio/ztests/parser-error.yaml index e29e4db399..247ff92898 100644 --- a/sio/fjsonio/ztests/parser-error.yaml +++ b/sio/fjsonio/ztests/parser-error.yaml @@ -1,6 +1,8 @@ # Test parse error on EOF. spq: pass +vector: true + input-flags: '-i fjson' input: |