From a3302680d8db5bf791ac699019a6a3c79f9f8a1b Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Thu, 21 May 2026 11:14:01 -0700 Subject: [PATCH] csup.Writer: Use vbuild.Builder This commit improves the csup.Writer but using vbuild.Builder to build vectors that eventually get encoded as CSUP. vbuild.Builder has improvements over the what existed is csup.Writer including the ability to handle vectors with Views, Consts or Dicts. This pr also removes Scode columns in CSUP, changing things so bool, ip, net, enum vectors are now natively encoded. --- csup/array.go | 34 ++----- csup/bool.go | 56 +++++++++++ csup/bytes.go | 178 ++++++++++++++++---------------- csup/const.go | 21 ++++ csup/dict.go | 63 +----------- csup/dynamic.go | 74 +++----------- csup/encoder.go | 195 +++++++++++++++++++++++++++--------- csup/enum.go | 23 +++++ csup/field.go | 6 -- csup/float.go | 71 ++----------- csup/fusion.go | 18 +--- csup/header.go | 2 +- csup/int.go | 148 ++++++++------------------- csup/ip.go | 67 +++++++++++++ csup/map.go | 22 +--- csup/metadata.go | 77 +++++++++++--- csup/named.go | 29 +++--- csup/net.go | 67 +++++++++++++ csup/none.go | 22 ++++ csup/null.go | 22 ++++ csup/record.go | 24 ++--- csup/scode.go | 143 -------------------------- csup/type.go | 30 +----- csup/union.go | 116 ++++----------------- csup/writer.go | 16 +-- csup/ztests/const.yaml | 2 +- runtime/vcache/bool.go | 49 +++++++++ runtime/vcache/bytes.go | 16 +-- runtime/vcache/enum.go | 33 ++++++ runtime/vcache/ip.go | 51 ++++++++++ runtime/vcache/net.go | 50 +++++++++ runtime/vcache/none.go | 28 ++++++ runtime/vcache/null.go | 28 ++++++ runtime/vcache/primitive.go | 140 -------------------------- runtime/vcache/shadow.go | 14 ++- vector/vbuild/dynamic.go | 8 +- 36 files changed, 980 insertions(+), 963 deletions(-) create mode 100644 csup/bool.go create mode 100644 csup/const.go create mode 100644 csup/enum.go create mode 100644 csup/ip.go create mode 100644 csup/net.go create mode 100644 csup/none.go create mode 100644 csup/null.go delete mode 100644 csup/scode.go create mode 100644 runtime/vcache/bool.go create mode 100644 runtime/vcache/enum.go create mode 100644 runtime/vcache/ip.go create mode 100644 runtime/vcache/net.go create mode 100644 runtime/vcache/none.go create mode 100644 runtime/vcache/null.go delete mode 100644 runtime/vcache/primitive.go diff --git a/csup/array.go b/csup/array.go index 845051deed..8a64cdb872 100644 --- a/csup/array.go +++ b/csup/array.go @@ -11,35 +11,16 @@ import ( type ArrayEncoder struct { typ super.Type values Encoder - offsets *offsetsEncoder + offsets *Uint32Encoder count uint32 } var _ Encoder = (*ArrayEncoder)(nil) -func NewArrayEncoder(cctx *Context, typ *super.TypeArray) *ArrayEncoder { +func NewArrayEncoder(cctx *Context, vec *vector.Array) *ArrayEncoder { return &ArrayEncoder{ - typ: typ.Type, - values: NewEncoder(cctx, typ.Type), - offsets: newOffsetsEncoder(), - } -} - -func (a *ArrayEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - switch vec := vec.(type) { - case *vector.Array: - a.count += vec.Len() - a.values.Write(vec.Values) - a.offsets.write(vec.Offsets) - case *vector.Set: - a.count += vec.Len() - a.values.Write(vec.Values) - a.offsets.write(vec.Offsets) - default: - panic(vec) + values: NewEncoder(cctx, vec.Values), + offsets: NewUint32Encoder(vec.Offsets), } } @@ -69,12 +50,11 @@ type SetEncoder struct { ArrayEncoder } -func NewSetEncoder(cctx *Context, typ *super.TypeSet) *SetEncoder { +func NewSetEncoder(cctx *Context, vec *vector.Set) *SetEncoder { return &SetEncoder{ ArrayEncoder{ - typ: typ.Type, - values: NewEncoder(cctx, typ.Type), - offsets: newOffsetsEncoder(), + values: NewEncoder(cctx, vec.Values), + offsets: &Uint32Encoder{vals: vec.Offsets}, }, } } diff --git a/csup/bool.go b/csup/bool.go new file mode 100644 index 0000000000..8b0377bd9b --- /dev/null +++ b/csup/bool.go @@ -0,0 +1,56 @@ +package csup + +import ( + "io" + + "github.com/brimdata/super/pkg/byteconv" + "github.com/brimdata/super/vector" + "github.com/brimdata/super/vector/bitvec" + "golang.org/x/sync/errgroup" +) + +type BoolEncoder struct { + bits bitvec.Bits + + // created on Encode + out []byte + fmt uint8 + bytesLen uint64 +} + +func NewBoolEncoder(vec *vector.Bool) Encoder { + return &BoolEncoder{bits: vec.Bits} +} + +func (b *BoolEncoder) Encode(group *errgroup.Group) { + group.Go(func() error { + bytes := byteconv.ReinterpretSlice[byte](b.bits.GetBits()) + f, out, err := compressBuffer(bytes) + if err != nil { + return err + } + b.fmt = f + b.out = out + b.bytesLen = uint64(len(bytes)) + return nil + }) +} + +func (b *BoolEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + loc := Segment{ + Offset: off, + MemLength: uint64(len(b.out)), + Length: b.bytesLen, + CompressionFormat: b.fmt, + } + off += loc.Length + return off, cctx.enter(&Bool{ + Location: loc, + Count: b.bits.Len(), + }) +} + +func (b *BoolEncoder) Emit(w io.Writer) error { + _, err := w.Write(b.out) + return err +} diff --git a/csup/bytes.go b/csup/bytes.go index 456690a776..45ddd88b15 100644 --- a/csup/bytes.go +++ b/csup/bytes.go @@ -12,72 +12,121 @@ import ( type BytesEncoder struct { typ super.Type + bytes *BytesTableEncoder min, max []byte - bytes []byte - offsets *offsetsEncoder - - // These values are used for the Encode pass. - bytesFmt uint8 - bytesOut []byte - bytesLen uint64 } -func NewBytesEncoder(typ super.Type) *BytesEncoder { +func NewBytesEncoder(typ super.Type, table vector.BytesTable) *BytesEncoder { return &BytesEncoder{ - typ: typ, - offsets: newOffsetsEncoder(), + typ: typ, + bytes: &BytesTableEncoder{table: table}, } } -func (b *BytesEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - switch vec := vec.(type) { - case *vector.Bytes: - b.writeTable(vec.Table()) - case *vector.String: - b.writeTable(vec.Table()) - default: - panic(vec) - } +func (b *BytesEncoder) Encode(group *errgroup.Group) { + b.bytes.Encode(group) + group.Go(func() error { + table := b.bytes.table + if table.Len() == 0 { + return nil + } + b.min, b.max = table.Bytes(0), table.Bytes(0) + for i := range table.Len() { + v := table.Bytes(i) + if bytes.Compare(v, b.min) < 0 { + b.min = v + } + if bytes.Compare(v, b.max) > 0 { + b.max = v + } + } + return nil + }) } -func (b *BytesEncoder) writeTable(table vector.BytesTable) { - if len(b.bytes) == 0 { - val := table.Bytes(0) - b.min = append(b.min[:0], val...) - b.max = append(b.max[:0], val...) - } - for slot := range table.Len() { - val := table.Bytes(slot) - if bytes.Compare(val, b.min) < 0 { - b.min = append(b.min[:0], val...) +func (b *BytesEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + off, bytesLoc, offsLoc := b.bytes.Segment(off) + return off, cctx.enter(&Bytes{ + Typ: b.typ, + Bytes: bytesLoc, + Offsets: offsLoc, + Min: b.min, + Max: b.max, + Count: b.bytes.table.Len(), + }) +} + +func (b *BytesEncoder) Emit(w io.Writer) error { + return b.bytes.Emit(w) +} + +func maybeStringBytesDict(typ super.Type, table vector.BytesTable) vector.Any { + flat := func(table vector.BytesTable) vector.Any { + if typ.ID() == super.IDString { + return vector.NewString(table) } - if bytes.Compare(val, b.max) > 0 { - b.max = append(b.max[:0], val...) + return vector.NewBytes(table) + } + var counts []uint32 + m := make(map[string]byte) + index := make([]byte, table.Len()) + out := vector.NewBytesTableEmpty(0) + for i := range table.Len() { + tag, ok := m[string(table.Bytes(i))] + if !ok { + if len(counts) > math.MaxUint8 { + return flat(table) + } + tag = byte(len(counts)) + b := table.Bytes(i) + m[string(b)] = tag + counts = append(counts, 0) + out.Append(b) } + index[i] = tag + counts[tag]++ } - b.bytes = append(b.bytes, table.RawBytes()...) - b.offsets.write(table.RawOffsets()) + if !isValidDict(int(table.Len()), int(out.Len())) { + return flat(table) + } + vec := flat(out) + if vec.Len() == 1 { + return vector.NewConst(vec, table.Len()) + } + return vector.NewDict(vec, index, counts) } -func (b *BytesEncoder) Encode(group *errgroup.Group) { +type BytesTableEncoder struct { + table vector.BytesTable + + // These values are used for the Encode pass. + bytesFmt uint8 + bytesOut []byte + bytesLen uint64 + offsets *Uint32Encoder +} + +func NewBytesTableEncoder(table vector.BytesTable) *BytesTableEncoder { + return &BytesTableEncoder{table: table} +} + +func (b *BytesTableEncoder) Encode(group *errgroup.Group) { group.Go(func() error { - fmt, out, err := compressBuffer(b.bytes) + bytes := b.table.RawBytes() + fmt, out, err := compressBuffer(bytes) if err != nil { return err } b.bytesFmt = fmt b.bytesOut = out - b.bytesLen = uint64(len(b.bytes)) - b.bytes = nil // send to GC + b.bytesLen = uint64(len(bytes)) return nil }) + b.offsets = NewUint32Encoder(b.table.RawOffsets()) b.offsets.Encode(group) } -func (b *BytesEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { +func (b *BytesTableEncoder) Segment(off uint64) (uint64, Segment, Segment) { bytesLoc := Segment{ Offset: off, Length: uint64(len(b.bytesOut)), @@ -85,17 +134,10 @@ func (b *BytesEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { CompressionFormat: b.bytesFmt, } off, offsLoc := b.offsets.Segment(off + bytesLoc.Length) - return off, cctx.enter(&Bytes{ - Typ: b.typ, - Bytes: bytesLoc, - Offsets: offsLoc, - Min: b.min, - Max: b.max, - Count: uint32(len(b.offsets.vals) - 1), - }) + return off, bytesLoc, offsLoc } -func (b *BytesEncoder) Emit(w io.Writer) error { +func (b *BytesTableEncoder) Emit(w io.Writer) error { if len(b.bytesOut) > 0 { if _, err := w.Write(b.bytesOut); err != nil { return err @@ -103,39 +145,3 @@ func (b *BytesEncoder) Emit(w io.Writer) error { } return b.offsets.Emit(w) } - -func (b *BytesEncoder) value(slot uint32) []byte { - return b.bytes[b.offsets.vals[slot]:b.offsets.vals[slot+1]] -} - -func (b *BytesEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - if len(b.bytes) == 0 { - return nil, nil, nil - } - m := make(map[string]byte) - var counts []uint32 - index := make([]byte, len(b.offsets.vals)-1) - table := vector.NewBytesTableEmpty(256) - for k := range uint32(len(index)) { - tag, ok := m[string(b.value(k))] - if !ok { - tag = byte(len(counts)) - v := b.value(k) - m[string(v)] = tag - table.Append(v) - counts = append(counts, 0) - if len(counts) > math.MaxUint8 { - return nil, nil, nil - } - } - index[k] = tag - counts[tag]++ - } - encoder := NewBytesEncoder(b.typ) - encoder.Write(vector.NewBytes(table)) - return encoder, index, counts -} - -func (b *BytesEncoder) ConstValue() super.Value { - return super.NewValue(b.typ, b.value(0)) -} diff --git a/csup/const.go b/csup/const.go new file mode 100644 index 0000000000..f0ebe22c7c --- /dev/null +++ b/csup/const.go @@ -0,0 +1,21 @@ +package csup + +import ( + "io" + + "github.com/brimdata/super" + "golang.org/x/sync/errgroup" +) + +type ConstEncoder struct { + val super.Value + len uint32 +} + +func (*ConstEncoder) Encode(group *errgroup.Group) {} + +func (c *ConstEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + return off, cctx.enter(&Const{c.val, c.len}) +} + +func (*ConstEncoder) Emit(w io.Writer) error { return nil } diff --git a/csup/dict.go b/csup/dict.go index 6ea3ba1e89..25170fecf6 100644 --- a/csup/dict.go +++ b/csup/dict.go @@ -3,70 +3,23 @@ package csup import ( "io" - "github.com/brimdata/super" "golang.org/x/sync/errgroup" ) type DictEncoder struct { - PrimitiveEncoder - typ super.Type - - // These fields are derived after Encode is called. + values Encoder counts *Uint32Encoder index []byte - const_ *Const -} - -func NewDictEncoder(typ super.Type, values PrimitiveEncoder) Encoder { - if id := typ.ID(); id == super.IDUint8 || id == super.IDInt8 || id == super.IDBool { - return values - } - return &DictEncoder{ - typ: typ, - PrimitiveEncoder: values, - } } func (d *DictEncoder) Encode(group *errgroup.Group) { - group.Go(func() error { - entries, index, counts := d.Dict() - if entries == nil { - d.PrimitiveEncoder.Encode(group) - return nil - } - if len(counts) == 1 { - d.const_ = &Const{ - Value: d.ConstValue(), - Count: uint32(len(index)), - } - return nil - } - if !isValidDict(len(index), len(counts)) { - d.PrimitiveEncoder.Encode(group) - return nil - } - d.index = index - d.PrimitiveEncoder = entries - d.counts = &Uint32Encoder{vals: counts} - d.PrimitiveEncoder.Encode(group) - d.counts.Encode(group) - return nil - }) -} - -func isValidDict(len, card int) bool { - return card >= 1 && card < len + d.values.Encode(group) + d.counts.Encode(group) } func (d *DictEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { - if d.const_ != nil { - return off, cctx.enter(d.const_) - } - if d.counts == nil { - return d.PrimitiveEncoder.Metadata(cctx, off) - } meta := &Dict{Length: uint32(len(d.index))} - off, meta.Values = d.PrimitiveEncoder.Metadata(cctx, off) + off, meta.Values = d.values.Metadata(cctx, off) off, meta.Counts = d.counts.Segment(off) len := uint64(len(d.index)) meta.Index = Segment{ @@ -78,15 +31,9 @@ func (d *DictEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { } func (d *DictEncoder) Emit(w io.Writer) error { - if d.const_ != nil { - return nil - } - if err := d.PrimitiveEncoder.Emit(w); err != nil { + if err := d.values.Emit(w); err != nil { return err } - if d.counts == nil { - return nil - } if err := d.counts.Emit(w); err != nil { return err } diff --git a/csup/dynamic.go b/csup/dynamic.go index dd323952ce..7578b3e365 100644 --- a/csup/dynamic.go +++ b/csup/dynamic.go @@ -3,81 +3,35 @@ package csup import ( "io" - "github.com/brimdata/super" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) type DynamicEncoder struct { cctx *Context - tags []uint32 - tagEnc *Uint32Encoder + tags *Uint32Encoder values []Encoder - which map[super.Type]uint32 len uint32 } -func NewDynamicEncoder() *DynamicEncoder { - return &DynamicEncoder{ - cctx: NewContext(), - which: make(map[super.Type]uint32), - } -} - -// The dynamic encoder self-organizes around the types that are -// written to it. No need to define the schema up front! -// We track the types seen first-come, first-served and the -// CSUP metadata structure follows accordingly. -func (d *DynamicEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - if dynamic, ok := vec.(*vector.Dynamic); ok { - d.appendDynamic(dynamic) - } else { - d.appendVec(vec) - } -} - -func (d *DynamicEncoder) appendDynamic(vec *vector.Dynamic) { - var tagmap []uint32 // input tags to local tags - for _, vec := range vec.Values { - tagmap = append(tagmap, d.lookupType(vec.Type())) +func NewDynamicEncoder(vec *vector.Dynamic) *DynamicEncoder { + cctx := NewContext() + values := make([]Encoder, len(vec.Values)) + for i, val := range vec.Values { + values[i] = NewEncoder(cctx, val) } - for _, intag := range vec.Tags { - d.tags = append(d.tags, tagmap[intag]) - } - for intag, vec := range vec.Values { - d.values[tagmap[intag]].Write(vec) - } - d.len += vec.Len() -} - -func (d *DynamicEncoder) appendVec(vec vector.Any) { - tag := d.lookupType(vec.Type()) - for range vec.Len() { - //XXX there's a better way, but let's get this working - d.tags = append(d.tags, tag) - } - d.values[tag].Write(vec) - d.len += vec.Len() -} - -func (d *DynamicEncoder) lookupType(typ super.Type) uint32 { - tag, ok := d.which[typ] - if !ok { - tag = uint32(len(d.values)) - d.values = append(d.values, NewEncoder(d.cctx, typ)) - d.which[typ] = tag + return &DynamicEncoder{ + cctx: cctx, + tags: NewUint32Encoder(vec.Tags), + values: values, + len: vec.Len(), } - return tag } func (d *DynamicEncoder) Encode() (ID, uint64, error) { var group errgroup.Group - d.tagEnc = &Uint32Encoder{vals: d.tags} if len(d.values) > 1 { - d.tagEnc.Encode(&group) + d.tags.Encode(&group) } for _, val := range d.values { val.Encode(&group) @@ -90,7 +44,7 @@ func (d *DynamicEncoder) Encode() (ID, uint64, error) { return id, off, nil } values := make([]ID, 0, len(d.values)) - off, tags := d.tagEnc.Segment(0) + off, tags := d.tags.Segment(0) for _, val := range d.values { var id ID off, id = val.Metadata(d.cctx, off) @@ -105,7 +59,7 @@ func (d *DynamicEncoder) Encode() (ID, uint64, error) { func (d *DynamicEncoder) Emit(w io.Writer) error { if len(d.values) > 1 { - if err := d.tagEnc.Emit(w); err != nil { + if err := d.tags.Emit(w); err != nil { return err } } diff --git a/csup/encoder.go b/csup/encoder.go index 5d2ca700aa..53e21c7350 100644 --- a/csup/encoder.go +++ b/csup/encoder.go @@ -3,15 +3,16 @@ package csup import ( "fmt" "io" + "math" + "net/netip" "github.com/brimdata/super" + "github.com/brimdata/super/scode" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) type Encoder interface { - // Write collects up values to be encoded into memory. - Write(vector.Any) // Encode encodes all in-memory vector data into its storage-ready serialized format. // Vectors may be encoded concurrently and errgroup.Group is used to sync // and return errors. @@ -28,57 +29,155 @@ type Encoder interface { Emit(w io.Writer) error } -type PrimitiveEncoder interface { - Encoder - Dict() (PrimitiveEncoder, []byte, []uint32) - ConstValue() super.Value +func NewEncoder(cctx *Context, vec vector.Any) Encoder { + switch vec := vec.(type) { + case *vector.Record: + return NewRecordEncoder(cctx, vec) + case *vector.Array: + return NewArrayEncoder(cctx, vec) + case *vector.Set: + return NewSetEncoder(cctx, vec) + case *vector.Map: + return NewMapEncoder(cctx, vec) + case *vector.Union: + return NewUnionEncoder(cctx, vec) + case *vector.Enum: + return NewEnumEncoder(vec) + case *vector.Error: + return &ErrorEncoder{NewEncoder(cctx, vec.Vals)} + case *vector.Named: + return NewNamedEncoder(cctx, vec) + case *vector.Fusion: + return NewFusionEncoder(cctx, vec) + default: + return NewPrimitiveEncoder(cctx, vec, true) + } } -func NewEncoder(cctx *Context, typ super.Type) Encoder { - switch typ := typ.(type) { - case *super.TypeNamed: - return &NamedEncoder{cctx: cctx, typ: typ} - case *super.TypeError: - return &ErrorEncoder{NewEncoder(cctx, typ.Type)} - case *super.TypeRecord: - return NewRecordEncoder(cctx, typ) - case *super.TypeArray: - return NewArrayEncoder(cctx, typ) - case *super.TypeSet: - // Sets encode the same way as arrays but behave - // differently semantically, and we don't care here. - return NewSetEncoder(cctx, typ) - case *super.TypeMap: - return NewMapEncoder(cctx, typ) - case *super.TypeUnion: - return NewUnionEncoder(cctx, typ) - case *super.TypeFusion: - return NewFusionEncoder(cctx, typ) - case *super.TypeEnum: - return NewPrimitiveEncoder(cctx, typ) - default: - if !super.IsPrimitiveType(typ) { - panic(fmt.Sprintf("unsupported type in CSUP file: %T", typ)) +func NewPrimitiveEncoder(cctx *Context, vec vector.Any, root bool) Encoder { + switch vec := vec.(type) { + case *vector.Dict: + return &DictEncoder{ + counts: NewUint32Encoder(vec.Counts), + values: NewPrimitiveEncoder(cctx, vec.Any, false), + index: vec.Index, + } + case *vector.Const: + return &ConstEncoder{ + val: vector.ValueAt(new(scode.Builder), vec.Any, 0), + len: vec.Len(), + } + case *vector.Uint: + if root { + // XXX This is a potential computationally intensive operation and + // should probable be move to the Encode pass where it can be + // parallelized. + out := maybeConvertToDictOrConst(vec.Values, func(vals []uint64) vector.Any { + return vector.NewUint(vec.Typ, vals) + }) + return NewPrimitiveEncoder(cctx, out, false) + } + return &UintEncoder{typ: vec.Typ, vals: vec.Values} + case *vector.Int: + if root { + out := maybeConvertToDictOrConst(vec.Values, func(vals []int64) vector.Any { + return vector.NewInt(vec.Typ, vals) + }) + return NewPrimitiveEncoder(cctx, out, false) + } + return &IntEncoder{typ: vec.Typ, vals: vec.Values} + case *vector.Float: + if root { + out := maybeConvertToDictOrConst(vec.Values, func(vals []float64) vector.Any { + return vector.NewFloat(vec.Typ, vals) + }) + return NewPrimitiveEncoder(cctx, out, false) + } + return NewFloatEncoder(vec.Typ, vec.Values) + case *vector.Bool: + // XXX can convert all trues and all falses to consts. + return NewBoolEncoder(vec) + case *vector.String: + if root { + out := maybeStringBytesDict(super.TypeString, vec.Table()) + return NewPrimitiveEncoder(cctx, out, false) + } + return NewBytesEncoder(super.TypeString, vec.Table()) + case *vector.Bytes: + if root { + out := maybeStringBytesDict(super.TypeBytes, vec.Table()) + return NewPrimitiveEncoder(cctx, out, false) + } + return NewBytesEncoder(super.TypeBytes, vec.Table()) + case *vector.IP: + if root { + out := maybeConvertToDictOrConst(vec.Values, func(vals []netip.Addr) vector.Any { + return vector.NewIP(vals) + }) + return NewPrimitiveEncoder(cctx, out, false) + } + return NewIPEncoder(vec.Values) + case *vector.Net: + if root { + out := maybeConvertToDictOrConst(vec.Values, func(vals []netip.Prefix) vector.Any { + return vector.NewNet(vals) + }) + return NewPrimitiveEncoder(cctx, out, false) } - return NewDictEncoder(typ, NewPrimitiveEncoder(cctx, typ)) + return NewNetEncoder(vec.Values) + case *vector.TypeValue: + if root { + out := maybeConvertToDictOrConst(vec.Types(), func(vals []super.Type) vector.Any { + return vector.NewTypeValue(vals) + }) + return NewPrimitiveEncoder(cctx, out, false) + } + return NewTypeValueEncoder(cctx, vec) + case *vector.Null: + return NewNullEncoder(vec.Len()) + case *vector.None: + return NewNoneEncoder(vec.Len()) + default: + panic(fmt.Sprintf("unsupported type in CSUP file: %T", vec)) } } -func NewPrimitiveEncoder(cctx *Context, typ super.Type) PrimitiveEncoder { - switch id := typ.ID(); { - case super.IsSigned(id): - return NewIntEncoder(typ) - case super.IsUnsigned(id): - return NewUintEncoder(typ) - case super.IsFloat(id): - return NewFloatEncoder(typ) - case id == super.IDBytes || id == super.IDString: - return NewBytesEncoder(typ) - case id == super.IDType: - return NewTypeValueEncoder(cctx) - default: - return NewScodeEncoder(typ) +func maybeConvertToDictOrConst[E comparable](in []E, fn func([]E) vector.Any) vector.Any { + vals, index, counts := comparableDict(in) + if vals == nil || !isValidDict(len(in), len(vals)) { + return fn(in) + } + flat := fn(vals) + if len(vals) == 1 { + return vector.NewConst(flat, counts[0]) + } + return vector.NewDict(flat, index, counts) +} + +func isValidDict(len, card int) bool { + return card >= 1 && card < len +} + +func comparableDict[T comparable](in []T) ([]T, []byte, []uint32) { + m := make(map[T]byte) + var counts []uint32 + index := make([]byte, len(in)) + var vals []T + for k, v := range in { + tag, ok := m[v] + if !ok { + if len(counts) > math.MaxUint8 { + return nil, nil, nil + } + tag = byte(len(counts)) + m[v] = tag + counts = append(counts, 0) + vals = append(vals, v) + } + index[k] = tag + counts[tag]++ } + return vals, index, counts } type ErrorEncoder struct { @@ -89,7 +188,3 @@ func (e *ErrorEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { off, id := e.Encoder.Metadata(cctx, off) return off, cctx.enter(&Error{id}) } - -func (e *ErrorEncoder) Write(vec vector.Any) { - e.Encoder.Write(vec.(*vector.Error).Vals) -} diff --git a/csup/enum.go b/csup/enum.go new file mode 100644 index 0000000000..857319e6bf --- /dev/null +++ b/csup/enum.go @@ -0,0 +1,23 @@ +package csup + +import ( + "github.com/brimdata/super" + "github.com/brimdata/super/vector" +) + +type EnumEncoder struct { + *UintEncoder + typ *super.TypeEnum +} + +func NewEnumEncoder(vec *vector.Enum) *EnumEncoder { + return &EnumEncoder{ + UintEncoder: &UintEncoder{typ: vec.Uint.Typ, vals: vec.Uint.Values}, + typ: vec.Typ, + } +} + +func (e *EnumEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + off, values := e.UintEncoder.Metadata(cctx, off) + return off, cctx.enter(&Enum{Symbols: e.typ.Symbols, Values: values}) +} diff --git a/csup/field.go b/csup/field.go index 17c94c4e79..313b012e68 100644 --- a/csup/field.go +++ b/csup/field.go @@ -3,20 +3,14 @@ package csup import ( "io" - "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) -// XXX in a forthcoming PR, we will change this to OptionEncoder type FieldEncoder struct { name string values Encoder } -func (f *FieldEncoder) write(vec vector.Any) { - f.values.Write(vec) -} - func (f *FieldEncoder) Metadata(cctx *Context, off uint64) (uint64, Field) { var id ID off, id = f.values.Metadata(cctx, off) diff --git a/csup/float.go b/csup/float.go index 1d6b309bc2..7a8c6bc963 100644 --- a/csup/float.go +++ b/csup/float.go @@ -2,12 +2,10 @@ package csup import ( "io" - "math" "slices" "github.com/brimdata/super" "github.com/brimdata/super/pkg/byteconv" - "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) @@ -19,28 +17,8 @@ type FloatEncoder struct { fmt uint8 } -func NewFloatEncoder(typ super.Type) *FloatEncoder { - return &FloatEncoder{typ: typ} -} - -func (f *FloatEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - fv := vec.(*vector.Float) - if len(f.vals) == 0 { - f.min = fv.Values[0] - f.max = fv.Values[0] - } - for _, v := range fv.Values { - if v < f.min { - f.min = v - } - if v > f.max { - f.max = v - } - } - f.vals = append(f.vals, fv.Values...) +func NewFloatEncoder(typ super.Type, vals []float64) *FloatEncoder { + return &FloatEncoder{typ: typ, vals: vals} } func (f *FloatEncoder) Encode(group *errgroup.Group) { @@ -50,6 +28,12 @@ func (f *FloatEncoder) Encode(group *errgroup.Group) { f.fmt, f.out, err = compressBuffer(bytes) return err }) + if len(f.vals) > 0 { + group.Go(func() error { + f.min, f.max = minMax(f.vals) + return nil + }) + } } func (u *FloatEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { @@ -76,42 +60,3 @@ func (u *FloatEncoder) Emit(w io.Writer) error { } return err } - -func comparableDict[T comparable](in []T) ([]T, []byte, []uint32) { - m := make(map[T]byte) - var counts []uint32 - index := make([]byte, len(in)) - var vals []T - for k, v := range in { - tag, ok := m[v] - if !ok { - tag = byte(len(counts)) - m[v] = tag - counts = append(counts, 0) - vals = append(vals, v) - if len(counts) > math.MaxUint8 { - return nil, nil, nil - } - } - index[k] = tag - counts[tag]++ - } - return vals, index, counts -} - -func (f *FloatEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - vals, index, count := comparableDict(f.vals) - if vals == nil { - return nil, nil, nil - } - return &FloatEncoder{ - typ: f.typ, - vals: vals, - min: f.min, - max: f.max, - }, index, count -} - -func (f *FloatEncoder) ConstValue() super.Value { - return super.NewFloat(f.typ, f.vals[0]) -} diff --git a/csup/fusion.go b/csup/fusion.go index 7be954ee32..c5d6976a14 100644 --- a/csup/fusion.go +++ b/csup/fusion.go @@ -3,38 +3,26 @@ package csup import ( "io" - "github.com/brimdata/super" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) type FusionEncoder struct { - typ *super.TypeFusion values Encoder subtypes Encoder } var _ Encoder = (*FusionEncoder)(nil) -func NewFusionEncoder(cctx *Context, typ *super.TypeFusion) *FusionEncoder { +func NewFusionEncoder(cctx *Context, vec *vector.Fusion) *FusionEncoder { return &FusionEncoder{ - typ: typ, - values: NewEncoder(cctx, typ.Type), + values: NewEncoder(cctx, vec.Values), // Call NewTypeValueEncoder directly because we do not want subtypes // wrapped in a dict / const. - subtypes: NewTypeValueEncoder(cctx), + subtypes: NewTypeValueEncoder(cctx, vec.Subtypes), } } -func (f *FusionEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - fusion := vec.(*vector.Fusion) - f.values.Write(fusion.Values) - f.subtypes.Write(fusion.Subtypes) -} - func (f *FusionEncoder) Emit(w io.Writer) error { if err := f.values.Emit(w); err != nil { return err diff --git a/csup/header.go b/csup/header.go index fd4904597a..68586a726d 100644 --- a/csup/header.go +++ b/csup/header.go @@ -8,7 +8,7 @@ import ( ) const ( - Version = 21 + Version = 22 HeaderSize = 36 MaxMetaSize = 100 * 1024 * 1024 MaxTypeSize = 100 * 1024 * 1024 diff --git a/csup/int.go b/csup/int.go index ad990b4b83..70ea1b4c96 100644 --- a/csup/int.go +++ b/csup/int.go @@ -1,46 +1,30 @@ package csup import ( + "cmp" "io" "github.com/brimdata/super" "github.com/brimdata/super/pkg/byteconv" - "github.com/brimdata/super/vector" "github.com/ronanh/intcomp" "golang.org/x/sync/errgroup" ) type IntEncoder struct { - typ super.Type - vals []int64 - min, max int64 - out []byte -} + typ super.Type + vals []int64 -func NewIntEncoder(typ super.Type) *IntEncoder { - return &IntEncoder{ - typ: typ, - } + // computed after encode is called. + out []byte + min int64 + max int64 } -func (i *IntEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - iv := vec.(*vector.Int) - if len(i.vals) == 0 { - i.min = iv.Values[0] - i.max = iv.Values[0] - } - for _, v := range iv.Values { - if v < i.min { - i.min = v - } - if v > i.max { - i.max = v - } +func NewIntEncoder(typ super.Type, vals []int64) *IntEncoder { + return &IntEncoder{ + typ: typ, + vals: vals, } - i.vals = append(i.vals, iv.Values...) } func (i *IntEncoder) Encode(group *errgroup.Group) { @@ -49,6 +33,12 @@ func (i *IntEncoder) Encode(group *errgroup.Group) { i.out = byteconv.ReinterpretSlice[byte](compressed) return nil }) + if len(i.vals) > 0 { + group.Go(func() error { + i.min, i.max = minMax(i.vals) + return nil + }) + } } func (i *IntEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { @@ -76,23 +66,6 @@ func (i *IntEncoder) Emit(w io.Writer) error { return err } -func (i *IntEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - entries, index, counts := comparableDict(i.vals) - if entries == nil { - return nil, nil, nil - } - return &IntEncoder{ - typ: i.typ, - vals: entries, - min: i.min, - max: i.max, - }, index, counts -} - -func (i *IntEncoder) ConstValue() super.Value { - return super.NewInt(i.typ, i.vals[0]) -} - type UintEncoder struct { typ super.Type vals []uint64 @@ -100,46 +73,41 @@ type UintEncoder struct { out []byte } -func NewUintEncoder(typ super.Type) *UintEncoder { - return &UintEncoder{typ: typ} -} - -func (u *UintEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - uv := vec.(*vector.Uint) - if len(u.vals) == 0 { - u.min = uv.Values[0] - u.max = uv.Values[0] - } - for _, v := range uv.Values { - if v < u.min { - u.min = v - } - if v > u.max { - u.max = v - } - } - u.vals = append(u.vals, uv.Values...) -} - func (u *UintEncoder) Encode(group *errgroup.Group) { group.Go(func() error { compressed := intcomp.CompressUint64(u.vals, nil) u.out = byteconv.ReinterpretSlice[byte](compressed) return nil }) + if len(u.vals) > 0 { + group.Go(func() error { + u.min, u.max = minMax(u.vals) + return nil + }) + } } -func (u *UintEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { +func minMax[T cmp.Ordered](vals []T) (T, T) { + minVal, maxVal := vals[0], vals[0] + for _, v := range vals { + minVal = min(minVal, v) + maxVal = max(maxVal, v) + } + return minVal, maxVal +} + +func (u *UintEncoder) Segment(off uint64) (uint64, Segment) { loc := Segment{ Offset: off, MemLength: uint64(len(u.out)), Length: uint64(len(u.vals)) * 8, CompressionFormat: CompressionFormatNone, } - off += loc.MemLength + return off + loc.MemLength, loc +} + +func (u *UintEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + off, loc := u.Segment(off) return off, cctx.enter(&Uint{ Typ: u.typ, Location: loc, @@ -157,29 +125,16 @@ func (u *UintEncoder) Emit(w io.Writer) error { return err } -func (u *UintEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - entries, index, counts := comparableDict(u.vals) - if entries == nil { - return nil, nil, nil - } - return &UintEncoder{ - typ: u.typ, - vals: entries, - min: u.min, - max: u.max, - }, index, counts -} - -func (u *UintEncoder) ConstValue() super.Value { - return super.NewUint(u.typ, u.vals[0]) -} - type Uint32Encoder struct { vals []uint32 out []byte bytesLen uint64 } +func NewUint32Encoder(vals []uint32) *Uint32Encoder { + return &Uint32Encoder{vals: vals} +} + func (u *Uint32Encoder) Write(v uint32) { u.vals = append(u.vals, v) } @@ -217,25 +172,6 @@ func (u *Uint32Encoder) Segment(off uint64) (uint64, Segment) { } } -type offsetsEncoder struct { - Uint32Encoder -} - -func newOffsetsEncoder() *offsetsEncoder { - return &offsetsEncoder{} -} - -func (o *offsetsEncoder) write(offsets []uint32) { - if len(o.vals) == 0 { - o.vals = offsets - } else { - base := o.vals[len(o.vals)-1] - for _, off := range offsets[1:] { - o.vals = append(o.vals, base+off) - } - } -} - func ReadUint32s(loc Segment, r io.ReaderAt) ([]uint32, error) { buf := make([]byte, loc.MemLength) if err := loc.Read(r, buf); err != nil { diff --git a/csup/ip.go b/csup/ip.go new file mode 100644 index 0000000000..61529acab3 --- /dev/null +++ b/csup/ip.go @@ -0,0 +1,67 @@ +package csup + +import ( + "io" + "net/netip" + + "github.com/brimdata/super/vector" + "golang.org/x/sync/errgroup" +) + +type IPEncoder struct { + vals []netip.Addr + table *BytesTableEncoder + min, max netip.Addr +} + +func NewIPEncoder(vals []netip.Addr) *IPEncoder { + return &IPEncoder{ + vals: vals, + } +} + +func (i *IPEncoder) Encode(group *errgroup.Group) { + group.Go(func() error { + var bytes []byte + offsets := []uint32{0} + for _, ip := range i.vals { + var err error + if bytes, err = ip.AppendBinary(bytes); err != nil { + panic(err) + } + offsets = append(offsets, uint32(len(bytes))) + } + i.table = NewBytesTableEncoder(vector.NewBytesTable(offsets, bytes)) + i.table.Encode(group) + return nil + }) + if len(i.vals) > 0 { + group.Go(func() error { + i.min, i.max = i.vals[0], i.vals[0] + for _, v := range i.vals { + if v.Compare(i.min) < 0 { + i.min = v + } + if v.Compare(i.max) > 0 { + i.max = v + } + } + return nil + }) + } +} + +func (i *IPEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + off, bytesLoc, offsLoc := i.table.Segment(off) + return off, cctx.enter(&IP{ + Bytes: bytesLoc, + Offsets: offsLoc, + Min: i.min, + Max: i.max, + Count: uint32(len(i.vals)), + }) +} + +func (i *IPEncoder) Emit(w io.Writer) error { + return i.table.Emit(w) +} diff --git a/csup/map.go b/csup/map.go index 42f5c312b7..6941998795 100644 --- a/csup/map.go +++ b/csup/map.go @@ -3,7 +3,6 @@ package csup import ( "io" - "github.com/brimdata/super" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) @@ -11,29 +10,18 @@ import ( type MapEncoder struct { keys Encoder values Encoder - offsets *offsetsEncoder + offsets *Uint32Encoder count uint32 } -func NewMapEncoder(cctx *Context, typ *super.TypeMap) *MapEncoder { +func NewMapEncoder(cctx *Context, vec *vector.Map) *MapEncoder { return &MapEncoder{ - keys: NewEncoder(cctx, typ.KeyType), - values: NewEncoder(cctx, typ.ValType), - offsets: newOffsetsEncoder(), + keys: NewEncoder(cctx, vec.Keys), + values: NewEncoder(cctx, vec.Values), + offsets: NewUint32Encoder(vec.Offsets), } } -func (m *MapEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - mapVec := vec.(*vector.Map) - m.count += vec.Len() - m.keys.Write(mapVec.Keys) - m.values.Write(mapVec.Values) - m.offsets.write(mapVec.Offsets) -} - func (m *MapEncoder) Emit(w io.Writer) error { if err := m.offsets.Emit(w); err != nil { return err diff --git a/csup/metadata.go b/csup/metadata.go index 13c8ac5733..a8e6337901 100644 --- a/csup/metadata.go +++ b/csup/metadata.go @@ -1,6 +1,7 @@ package csup import ( + "net/netip" "slices" "github.com/brimdata/super" @@ -91,6 +92,15 @@ func (u *Union) Len(*Context) uint32 { return u.Length } +type Enum struct { + Symbols []string + Values ID +} + +func (e *Enum) Len(cctx *Context) uint32 { + return cctx.Lookup(e.Values).Len(cctx) +} + type Named struct { Name string Values ID @@ -165,6 +175,15 @@ func (f *Float) Len(*Context) uint32 { return f.Count } +type Bool struct { + Location Segment + Count uint32 +} + +func (b *Bool) Len(*Context) uint32 { + return b.Count +} + type Bytes struct { Typ super.Type `super:"Type"` Bytes Segment @@ -191,21 +210,44 @@ func (t *TypeValue) Len(*Context) uint32 { return t.Length } -type Primitive struct { - Typ super.Type `super:"Type"` - Location Segment - MinMax bool - Min super.Value - Max super.Value - Count uint32 +type IP struct { + Bytes Segment + Offsets Segment + Min netip.Addr + Max netip.Addr + Count uint32 +} + +func (n *IP) Len(*Context) uint32 { + return n.Count +} + +type Net struct { + Bytes Segment + Offsets Segment + Min netip.Prefix + Max netip.Prefix + Count uint32 } -func (p *Primitive) Type(*Context, *super.Context) super.Type { - return p.Typ +func (n *Net) Len(*Context) uint32 { + return n.Count +} + +type Null struct { + Count uint32 +} + +func (n *Null) Len(*Context) uint32 { + return n.Count +} + +type None struct { + Count uint32 } -func (p *Primitive) Len(*Context) uint32 { - return p.Count +func (n *None) Len(*Context) uint32 { + return n.Count } type Const struct { @@ -277,8 +319,6 @@ func metadataValue(cctx *Context, sctx *super.Context, b *scode.Builder, id ID, } b.EndContainer() return sctx.MustLookupTypeRecord(fields) - case *Primitive: - return metadataLeaf(sctx, b, m.Min, m.Max) case *Int: return metadataLeaf(sctx, b, super.NewInt(m.Typ, m.Min), super.NewInt(m.Typ, m.Max)) case *Uint: @@ -287,6 +327,10 @@ func metadataValue(cctx *Context, sctx *super.Context, b *scode.Builder, id ID, return metadataLeaf(sctx, b, super.NewFloat(m.Typ, m.Min), super.NewFloat(m.Typ, m.Max)) case *Bytes: return metadataLeaf(sctx, b, super.NewValue(m.Typ, m.Min), super.NewValue(m.Typ, m.Max)) + case *IP: + return metadataLeaf(sctx, b, super.NewIP(m.Min), super.NewIP(m.Max)) + case *Net: + return metadataLeaf(sctx, b, super.NewNet(m.Min), super.NewNet(m.Max)) case *Const: return metadataLeaf(sctx, b, m.Value, m.Value) default: @@ -322,7 +366,6 @@ var Template = []any{ Uint{}, Float{}, Bytes{}, - Primitive{}, TypeValue{}, Named{}, Error{}, @@ -331,4 +374,10 @@ var Template = []any{ Dynamic{}, Fusion{}, Empty{}, + Enum{}, + Bool{}, + IP{}, + Net{}, + Null{}, + None{}, } diff --git a/csup/named.go b/csup/named.go index 879bdc8cc4..9b5a158272 100644 --- a/csup/named.go +++ b/csup/named.go @@ -10,26 +10,15 @@ import ( type NamedEncoder struct { encoder Encoder - cctx *Context typ *super.TypeNamed } -func (n *NamedEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { - if n.encoder == nil { - return off, cctx.enter(&Empty{Type: n.typ}) - } - off, id := n.encoder.Metadata(cctx, off) - return off, cctx.enter(&Named{n.typ.Name, id}) -} - -func (n *NamedEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - if n.encoder == nil { - n.encoder = NewEncoder(n.cctx, n.typ.Type) +func NewNamedEncoder(cctx *Context, named *vector.Named) Encoder { + e := &NamedEncoder{typ: named.Typ} + if _, ok := named.Any.(*vector.Empty); !ok { + e.encoder = NewEncoder(cctx, named.Any) } - n.encoder.Write(vec.(*vector.Named).Any) + return e } func (n *NamedEncoder) Encode(group *errgroup.Group) { @@ -38,6 +27,14 @@ func (n *NamedEncoder) Encode(group *errgroup.Group) { } } +func (n *NamedEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + if n.encoder == nil { + return off, cctx.enter(&Empty{Type: n.typ}) + } + off, id := n.encoder.Metadata(cctx, off) + return off, cctx.enter(&Named{n.typ.Name, id}) +} + func (n *NamedEncoder) Emit(w io.Writer) error { if n.encoder != nil { return n.encoder.Emit(w) diff --git a/csup/net.go b/csup/net.go new file mode 100644 index 0000000000..bdc97a3145 --- /dev/null +++ b/csup/net.go @@ -0,0 +1,67 @@ +package csup + +import ( + "io" + "net/netip" + + "github.com/brimdata/super/vector" + "golang.org/x/sync/errgroup" +) + +type NetEncoder struct { + vals []netip.Prefix + table *BytesTableEncoder + min, max netip.Prefix +} + +func NewNetEncoder(vals []netip.Prefix) *NetEncoder { + return &NetEncoder{ + vals: vals, + } +} + +func (n *NetEncoder) Encode(group *errgroup.Group) { + group.Go(func() error { + var bytes []byte + offsets := []uint32{0} + for _, net := range n.vals { + var err error + if bytes, err = net.AppendBinary(bytes); err != nil { + panic(err) + } + offsets = append(offsets, uint32(len(bytes))) + } + n.table = NewBytesTableEncoder(vector.NewBytesTable(offsets, bytes)) + n.table.Encode(group) + return nil + }) + if len(n.vals) > 0 { + group.Go(func() error { + n.min, n.max = n.vals[0], n.vals[0] + for _, v := range n.vals { + if v.Compare(n.min) < 0 { + n.min = v + } + if v.Compare(n.max) > 0 { + n.max = v + } + } + return nil + }) + } +} + +func (n *NetEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + off, bytesLoc, offsLoc := n.table.Segment(off) + return off, cctx.enter(&Net{ + Bytes: bytesLoc, + Offsets: offsLoc, + Min: n.min, + Max: n.max, + Count: uint32(len(n.vals)), + }) +} + +func (n *NetEncoder) Emit(w io.Writer) error { + return n.table.Emit(w) +} diff --git a/csup/none.go b/csup/none.go new file mode 100644 index 0000000000..89d769eb07 --- /dev/null +++ b/csup/none.go @@ -0,0 +1,22 @@ +package csup + +import ( + "io" + + "golang.org/x/sync/errgroup" +) + +type NoneEncoder struct { + len uint32 +} + +func NewNoneEncoder(len uint32) *NoneEncoder { + return &NoneEncoder{len} +} + +func (n *NoneEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + return off, cctx.enter(&None{n.len}) +} + +func (*NoneEncoder) Encode(group *errgroup.Group) {} +func (*NoneEncoder) Emit(io.Writer) error { return nil } diff --git a/csup/null.go b/csup/null.go new file mode 100644 index 0000000000..e5d532371c --- /dev/null +++ b/csup/null.go @@ -0,0 +1,22 @@ +package csup + +import ( + "io" + + "golang.org/x/sync/errgroup" +) + +type NullEncoder struct { + len uint32 +} + +func NewNullEncoder(len uint32) *NullEncoder { + return &NullEncoder{len} +} + +func (n *NullEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + return off, cctx.enter(&Null{n.len}) +} + +func (*NullEncoder) Encode(*errgroup.Group) {} +func (*NullEncoder) Emit(io.Writer) error { return nil } diff --git a/csup/record.go b/csup/record.go index ea46c21176..20ca6c30f1 100644 --- a/csup/record.go +++ b/csup/record.go @@ -3,7 +3,6 @@ package csup import ( "io" - "github.com/brimdata/super" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) @@ -15,26 +14,15 @@ type RecordEncoder struct { var _ Encoder = (*RecordEncoder)(nil) -func NewRecordEncoder(cctx *Context, typ *super.TypeRecord) *RecordEncoder { - fields := make([]*FieldEncoder, 0, len(typ.Fields)) - for _, f := range typ.Fields { +func NewRecordEncoder(cctx *Context, vec *vector.Record) *RecordEncoder { + fields := make([]*FieldEncoder, 0, len(vec.Fields)) + for i, f := range vec.Fields { fields = append(fields, &FieldEncoder{ - name: f.Name, - values: NewEncoder(cctx, f.Type), + name: vec.Typ.Fields[i].Name, + values: NewEncoder(cctx, f), }) } - return &RecordEncoder{fields: fields} -} - -func (r *RecordEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - rec := vec.(*vector.Record) - r.count += rec.Len() - for k, f := range r.fields { - f.write(rec.Fields[k]) - } + return &RecordEncoder{fields: fields, count: vec.Len()} } func (r *RecordEncoder) Encode(group *errgroup.Group) { diff --git a/csup/scode.go b/csup/scode.go deleted file mode 100644 index 99db5a6240..0000000000 --- a/csup/scode.go +++ /dev/null @@ -1,143 +0,0 @@ -package csup - -import ( - "io" - "math" - - "github.com/brimdata/super" - "github.com/brimdata/super/order" - "github.com/brimdata/super/runtime/sam/expr" - "github.com/brimdata/super/scode" - "github.com/brimdata/super/vector" - "golang.org/x/sync/errgroup" -) - -const MaxDictSize = 256 - -type ScodeEncoder struct { - typ super.Type - bytes scode.Bytes - cmp expr.CompareFn - min super.Value - max super.Value - minmax bool - count uint32 - - // fields used after Encode is called - bytesLen uint64 - format uint8 - out []byte -} - -func NewScodeEncoder(typ super.Type) *ScodeEncoder { - return &ScodeEncoder{ - typ: typ, - cmp: expr.NewValueCompareFn(order.Asc, order.NullsFirst), - min: super.Null, - max: super.Null, - } -} - -// XXX TBD: change all the scode primitives to be native and get rid of -// this slow path here. -func (p *ScodeEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - var b scode.Builder - for slot := range vec.Len() { - b.Reset() - vec.Serialize(&b, slot) - body := b.Bytes().Body() - p.update(body) - p.bytes = scode.Append(p.bytes, body) - } -} - -func (p *ScodeEncoder) WriteBytes(bytes scode.Bytes) { - p.update(bytes) - p.bytes = scode.Append(p.bytes, bytes) -} - -func (p *ScodeEncoder) update(body scode.Bytes) { - p.count++ - val := super.NewValue(p.typ, body) - if !p.minmax || p.cmp(val, p.min) < 0 { - p.min = val.Copy() - } - if !p.minmax || p.cmp(val, p.max) > 0 { - p.max = val.Copy() - } - p.minmax = true -} - -func (p *ScodeEncoder) Encode(group *errgroup.Group) { - group.Go(func() error { - fmt, out, err := compressBuffer(p.bytes) - if err != nil { - return err - } - p.format = fmt - p.out = out - p.bytesLen = uint64(len(p.bytes)) - p.bytes = nil // send to GC - return nil - }) -} - -func (p *ScodeEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { - loc := Segment{ - Offset: off, - Length: uint64(len(p.out)), - MemLength: p.bytesLen, - CompressionFormat: p.format, - } - off += uint64(len(p.out)) - return off, cctx.enter(&Primitive{ - Typ: p.typ, - Location: loc, - Count: p.count, - MinMax: p.minmax, - Min: p.min, - Max: p.max, - }) -} - -func (p *ScodeEncoder) Emit(w io.Writer) error { - var err error - if len(p.out) > 0 { - _, err = w.Write(p.out) - } - return err -} - -func (p *ScodeEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - m := make(map[string]byte) - var counts []uint32 - index := make([]byte, p.count) - entries := NewScodeEncoder(p.typ) - var k uint32 - it := p.bytes.Iter() - for !it.Done() { - v := it.Next() - tag, ok := m[string(v)] - if !ok { - tag = byte(len(counts)) - m[string(v)] = tag - counts = append(counts, 0) - entries.WriteBytes(v) - if len(counts) > math.MaxUint8 { - return nil, nil, nil - } - } - index[k] = tag - counts[tag]++ - k++ - } - return entries, index, counts -} - -func (p *ScodeEncoder) ConstValue() super.Value { - it := p.bytes.Iter() - return super.NewValue(p.typ, it.Next()) -} diff --git a/csup/type.go b/csup/type.go index b95ec94321..648eafd761 100644 --- a/csup/type.go +++ b/csup/type.go @@ -10,23 +10,17 @@ import ( type TypeValueEncoder struct { cctx *Context - ids []uint32 encoder *Uint32Encoder } -func NewTypeValueEncoder(cctx *Context) PrimitiveEncoder { - return &TypeValueEncoder{cctx: cctx} -} - -func (t *TypeValueEncoder) Write(vec vector.Any) { - types := vec.(*vector.TypeValue) - defs, ids := types.TypeIDs() - merger := super.NewTypeDefsMerger(t.cctx.TypeDefs(), defs) +func NewTypeValueEncoder(cctx *Context, vec *vector.TypeValue) Encoder { + defs, ids := vec.TypeIDs() + merger := super.NewTypeDefsMerger(cctx.TypeDefs(), defs) mapped := make([]uint32, 0, len(ids)) for _, localID := range ids { mapped = append(mapped, merger.LookupID(localID)) } - t.ids = append(t.ids, mapped...) + return &TypeValueEncoder{cctx: cctx, encoder: NewUint32Encoder(mapped)} } func (t *TypeValueEncoder) Emit(w io.Writer) error { @@ -34,7 +28,6 @@ func (t *TypeValueEncoder) Emit(w io.Writer) error { } func (t *TypeValueEncoder) Encode(group *errgroup.Group) { - t.encoder = &Uint32Encoder{vals: t.ids} t.encoder.Encode(group) } @@ -44,18 +37,3 @@ func (t *TypeValueEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { Location: loc, }) } - -func (t *TypeValueEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - entries, index, counts := comparableDict(t.ids) - if entries == nil { - return nil, nil, nil - } - return &TypeValueEncoder{ - ids: entries, - }, index, counts -} - -func (t *TypeValueEncoder) ConstValue() super.Value { - typ := super.NewTypeDefsMapper(t.cctx.local, t.cctx.typedefs).LookupType(t.ids[0]) - return super.NewValue(super.TypeType, super.EncodeTypeValue(typ)) -} diff --git a/csup/union.go b/csup/union.go index bd63bf0224..999430e0cb 100644 --- a/csup/union.go +++ b/csup/union.go @@ -3,119 +3,38 @@ package csup import ( "io" - "github.com/brimdata/super" - "github.com/brimdata/super/sup" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) type UnionEncoder struct { - typ *super.TypeUnion - values []Encoder - count uint32 - tags []uint32 - tagEnc *Uint32Encoder + values []Encoder + rleOrTags *Uint32Encoder + count uint32 } var _ Encoder = (*UnionEncoder)(nil) -func NewUnionEncoder(cctx *Context, typ *super.TypeUnion) *UnionEncoder { +func NewUnionEncoder(cctx *Context, vec *vector.Union) *UnionEncoder { var values []Encoder - for _, typ := range typ.Types { - values = append(values, NewEncoder(cctx, typ)) + for _, val := range vec.Values() { + values = append(values, NewEncoder(cctx, val)) } - return &UnionEncoder{ - typ: typ, - values: values, - } -} - -func (u *UnionEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - union := vec.(*vector.Union) - u.count += vec.Len() - // Union vectors do not require that the values slice has - // alignment with the types in the union type. Thus, we can - // have vectors land here that have different orderings for - // the same union type. We could optimize this by adopting the - // order of the first vector and recomputing the tags for each - // subsequent incoming vector so that we don't have to rewrite - // the tags of the first vector, but for now, we just map - // everything to canonical order of the union types. - var vecs []vector.Any - if len(union.Typ.Types) == 2 { - // Code tags as run lengths. - rle := union.TagsRLE() - if rle == nil { - // Encoder returns nil for all tag 0 - rle = []uint32{0, vec.Len()} - } - // RLEs have the nice property that you can just concatenate them - // to append two vectors. - vecs, rle = reorderRLE(union, rle) - u.tags = append(u.tags, rle...) - } else { - var tags []uint32 - vecs, tags = reorder(union) - u.tags = append(u.tags, tags...) - } - for k, vec := range vecs { - if vec != nil && vec.Len() != 0 { - u.values[k].Write(vec) - } - } -} - -func reorderRLE(union *vector.Union, rle []uint32) ([]vector.Any, []uint32) { - vecs := union.Values() - if canonOrder(union.Typ, vecs) { - return vecs, rle - } - if rle[0] == 0 { - rle = rle[1:] + var rleOrTags []uint32 + if len(vec.Typ.Types) == 2 { + rleOrTags = vec.TagsRLE() } else { - rle = append([]uint32{0}, rle...) - } - return []vector.Any{vecs[1], vecs[0]}, rle -} - -func reorder(union *vector.Union) ([]vector.Any, []uint32) { - vecs := union.Values() - if canonOrder(union.Typ, vecs) { - return vecs, union.Tags() - } - tagmap := make([]uint32, len(vecs)) - for inTag, vec := range vecs { - localTag := union.Typ.TagOf(vec.Type()) - if localTag < 0 { - panic(sup.String(vec.Type())) - } - tagmap[inTag] = uint32(localTag) + rleOrTags = vec.Tags() } - tags := make([]uint32, len(union.Tags())) - for k, intag := range union.Tags() { - tags[k] = tagmap[intag] - } - vals := make([]vector.Any, len(union.Typ.Types)) - for inTag, v := range union.Values() { - vals[tagmap[inTag]] = v - } - return vals, tags -} - -func canonOrder(typ *super.TypeUnion, vecs []vector.Any) bool { - for inTag, vec := range vecs { - if inTag != typ.TagOf(vec.Type()) { - return false - } + return &UnionEncoder{ + values: values, + rleOrTags: NewUint32Encoder(rleOrTags), + count: vec.Len(), } - return true } func (u *UnionEncoder) Emit(w io.Writer) error { - if err := u.tagEnc.Emit(w); err != nil { + if err := u.rleOrTags.Emit(w); err != nil { return err } for _, value := range u.values { @@ -127,15 +46,14 @@ func (u *UnionEncoder) Emit(w io.Writer) error { } func (u *UnionEncoder) Encode(group *errgroup.Group) { - u.tagEnc = &Uint32Encoder{vals: u.tags} - u.tagEnc.Encode(group) + u.rleOrTags.Encode(group) for _, value := range u.values { value.Encode(group) } } func (u *UnionEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { - off, tags := u.tagEnc.Segment(off) + off, tags := u.rleOrTags.Segment(off) values := make([]ID, 0, len(u.values)) for _, val := range u.values { var id ID diff --git a/csup/writer.go b/csup/writer.go index 53f52f09cc..11357bf64d 100644 --- a/csup/writer.go +++ b/csup/writer.go @@ -10,6 +10,7 @@ import ( "github.com/brimdata/super/sio/bsupio" "github.com/brimdata/super/sup" "github.com/brimdata/super/vector" + "github.com/brimdata/super/vector/vbuild" "github.com/brimdata/super/vector/vio" ) @@ -19,7 +20,7 @@ var maxObjectSize uint32 = 120_000 // CSUP object from a stream of vector.Any. type Serializer struct { writer io.WriteCloser - dynamic *DynamicEncoder + dynamic *vbuild.DynamicBuilder } var _ vio.Pusher = (*Serializer)(nil) @@ -27,7 +28,7 @@ var _ vio.Pusher = (*Serializer)(nil) func NewSerializer(w io.WriteCloser) *Serializer { return &Serializer{ writer: w, - dynamic: NewDynamicEncoder(), + dynamic: vbuild.NewDynamicBuilder(), } } @@ -42,7 +43,7 @@ func (w *Serializer) Close() error { func (w *Serializer) Push(vec vector.Any) error { if vec.Len() != 0 { w.dynamic.Write(vec) - if w.dynamic.len >= maxObjectSize { + if w.dynamic.Len() >= maxObjectSize { return w.finalizeObject() } } @@ -50,7 +51,8 @@ func (w *Serializer) Push(vec vector.Any) error { } func (w *Serializer) finalizeObject() error { - root, dataSize, err := w.dynamic.Encode() + enc := NewDynamicEncoder(w.dynamic.BuildDynamic()) + root, dataSize, err := enc.Encode() if err != nil { return fmt.Errorf("system error: could not encode CSUP metadata: %w", err) } @@ -59,7 +61,7 @@ func (w *Serializer) finalizeObject() error { var metaBuf bytes.Buffer zw := bsupio.NewWriter(sio.NopCloser(&metaBuf)) // First, we write the root segmap of the vector of integer type IDs. - cctx := w.dynamic.cctx + cctx := enc.cctx m := sup.NewBSUPMarshalerWithContext(cctx.local) m.Decorate(sup.StyleSimple) for id := range len(cctx.metas) { @@ -88,11 +90,11 @@ func (w *Serializer) finalizeObject() error { return fmt.Errorf("system error: could not write CSUP metadata section: %w", err) } // Data section - if err := w.dynamic.Emit(w.writer); err != nil { + if err := enc.Emit(w.writer); err != nil { return fmt.Errorf("system error: could not write CSUP data section: %w", err) } // Set new dynamic so we can write the next object. - w.dynamic = NewDynamicEncoder() + w.dynamic = vbuild.NewDynamicBuilder() return nil } diff --git a/csup/ztests/const.yaml b/csup/ztests/const.yaml index 5cee716909..426c3dda99 100644 --- a/csup/ztests/const.yaml +++ b/csup/ztests/const.yaml @@ -12,6 +12,6 @@ inputs: outputs: - name: stdout data: | - {Version:21::uint32,MetaSize:45::uint64,TypeSize:6::uint64,DataSize:0::uint64,Root:0::uint32} + {Version:22::uint32,MetaSize:45::uint64,TypeSize:6::uint64,DataSize:0::uint64,Root:0::uint32} type Const={Value:fusion(all),Count:uint32} {Value:fusion(0x02,),Count:3}::Const diff --git a/runtime/vcache/bool.go b/runtime/vcache/bool.go new file mode 100644 index 0000000000..a734c860c4 --- /dev/null +++ b/runtime/vcache/bool.go @@ -0,0 +1,49 @@ +package vcache + +import ( + "sync" + + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/byteconv" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" + "github.com/brimdata/super/vector/bitvec" +) + +type bool_ struct { + mu sync.Mutex + meta *csup.Bool + bits *bitvec.Bits +} + +func newBool(meta *csup.Bool) *bool_ { + return &bool_{meta: meta} +} + +func (b *bool_) length() uint32 { + return b.meta.Count +} + +func (*bool_) unmarshal(*csup.Context, field.Projection) {} + +func (b *bool_) project(loader *loader, projection field.Projection) vector.Any { + if len(projection) > 0 { + return vector.NewMissing(loader.sctx, b.length()) + } + return vector.NewBool(b.load(loader)) +} + +func (b *bool_) load(loader *loader) bitvec.Bits { + b.mu.Lock() + defer b.mu.Unlock() + if b.bits != nil { + return *b.bits + } + bytes := make([]byte, b.meta.Location.MemLength) + if err := b.meta.Location.Read(loader.r, bytes); err != nil { + panic(err) + } + bits := bitvec.New(byteconv.ReinterpretSlice[uint64](bytes), b.meta.Count) + b.bits = &bits + return bits +} diff --git a/runtime/vcache/bytes.go b/runtime/vcache/bytes.go index a30c3c2725..80b1dd02ba 100644 --- a/runtime/vcache/bytes.go +++ b/runtime/vcache/bytes.go @@ -47,18 +47,22 @@ func (b *bytes) load(loader *loader) vector.BytesTable { if b.table != nil { return *b.table } - offs, err := csup.ReadUint32s(b.meta.Offsets, loader.r) + table := loadBytesTable(loader, b.meta.Offsets, b.meta.Bytes) + b.table = &table + return table +} + +func loadBytesTable(loader *loader, offsets, bytes csup.Segment) vector.BytesTable { + offs, err := csup.ReadUint32s(offsets, loader.r) if err != nil { panic(err) } if len(offs) == 0 { offs = []uint32{0} } - bytes := make([]byte, b.meta.Bytes.MemLength) - if err := b.meta.Bytes.Read(loader.r, bytes); err != nil { + b := make([]byte, bytes.MemLength) + if err := bytes.Read(loader.r, b); err != nil { panic(err) } - table := vector.NewBytesTable(offs, bytes) - b.table = &table - return table + return vector.NewBytesTable(offs, b) } diff --git a/runtime/vcache/enum.go b/runtime/vcache/enum.go new file mode 100644 index 0000000000..311d5b3a92 --- /dev/null +++ b/runtime/vcache/enum.go @@ -0,0 +1,33 @@ +package vcache + +import ( + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" +) + +type enum struct { + meta *csup.Enum + values shadow +} + +func (e *enum) length() uint32 { + return e.values.length() +} + +func newEnum(meta *csup.Enum, values shadow) *enum { + return &enum{ + meta: meta, + values: values, + } +} + +func (e *enum) unmarshal(cctx *csup.Context, projection field.Projection) { + e.values.unmarshal(cctx, projection) +} + +func (e *enum) project(loader *loader, projection field.Projection) vector.Any { + vec := e.values.project(loader, projection).(*vector.Uint) + enum := loader.sctx.LookupTypeEnum(e.meta.Symbols) + return &vector.Enum{Uint: vec, Typ: enum} +} diff --git a/runtime/vcache/ip.go b/runtime/vcache/ip.go new file mode 100644 index 0000000000..aaafb1d034 --- /dev/null +++ b/runtime/vcache/ip.go @@ -0,0 +1,51 @@ +package vcache + +import ( + "net/netip" + "sync" + + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" +) + +type ip struct { + mu sync.Mutex + meta *csup.IP + vals []netip.Addr +} + +func newIP(meta *csup.IP) *ip { + return &ip{meta: meta} +} + +func (i *ip) length() uint32 { + return i.meta.Count +} + +func (*ip) unmarshal(*csup.Context, field.Projection) {} + +func (i *ip) project(loader *loader, projection field.Projection) vector.Any { + if len(projection) > 0 { + return vector.NewMissing(loader.sctx, i.length()) + } + return vector.NewIP(i.load(loader)) +} + +func (i *ip) load(loader *loader) []netip.Addr { + i.mu.Lock() + defer i.mu.Unlock() + if i.vals != nil { + return i.vals + } + i.vals = make([]netip.Addr, i.meta.Count) + table := loadBytesTable(loader, i.meta.Offsets, i.meta.Bytes) + for k := range table.Len() { + var ok bool + if i.vals[k], ok = netip.AddrFromSlice(table.Bytes(k)); !ok { + panic("malformed ip block") + } + + } + return i.vals +} diff --git a/runtime/vcache/net.go b/runtime/vcache/net.go new file mode 100644 index 0000000000..52af85a28e --- /dev/null +++ b/runtime/vcache/net.go @@ -0,0 +1,50 @@ +package vcache + +import ( + "net/netip" + "sync" + + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" +) + +type net struct { + mu sync.Mutex + meta *csup.Net + vals []netip.Prefix +} + +func newNet(meta *csup.Net) *net { + return &net{meta: meta} +} + +func (n *net) length() uint32 { + return n.meta.Count +} + +func (*net) unmarshal(*csup.Context, field.Projection) {} + +func (n *net) project(loader *loader, projection field.Projection) vector.Any { + if len(projection) > 0 { + return vector.NewMissing(loader.sctx, n.length()) + } + return vector.NewNet(n.load(loader)) +} + +func (n *net) load(loader *loader) []netip.Prefix { + n.mu.Lock() + defer n.mu.Unlock() + if n.vals != nil { + return n.vals + } + n.vals = make([]netip.Prefix, n.meta.Count) + table := loadBytesTable(loader, n.meta.Offsets, n.meta.Bytes) + for k := range table.Len() { + if err := n.vals[k].UnmarshalBinary(table.Bytes(k)); err != nil { + panic(err) + } + + } + return n.vals +} diff --git a/runtime/vcache/none.go b/runtime/vcache/none.go new file mode 100644 index 0000000000..513ea4c4db --- /dev/null +++ b/runtime/vcache/none.go @@ -0,0 +1,28 @@ +package vcache + +import ( + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" +) + +type none struct { + meta *csup.None +} + +func newNone(meta *csup.None) *none { + return &none{meta: meta} +} + +func (n *none) length() uint32 { + return n.meta.Count +} + +func (*none) unmarshal(*csup.Context, field.Projection) {} + +func (n *none) project(loader *loader, projection field.Projection) vector.Any { + if len(projection) > 0 { + return vector.NewMissing(loader.sctx, n.length()) + } + return vector.NewNone(n.meta.Count) +} diff --git a/runtime/vcache/null.go b/runtime/vcache/null.go new file mode 100644 index 0000000000..b4f05bc79c --- /dev/null +++ b/runtime/vcache/null.go @@ -0,0 +1,28 @@ +package vcache + +import ( + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" +) + +type null struct { + meta *csup.Null +} + +func newNull(meta *csup.Null) *null { + return &null{meta: meta} +} + +func (n *null) length() uint32 { + return n.meta.Count +} + +func (*null) unmarshal(*csup.Context, field.Projection) {} + +func (n *null) project(loader *loader, projection field.Projection) vector.Any { + if len(projection) > 0 { + return vector.NewMissing(loader.sctx, n.length()) + } + return vector.NewNull(n.meta.Count) +} diff --git a/runtime/vcache/primitive.go b/runtime/vcache/primitive.go deleted file mode 100644 index 5d1d1d2472..0000000000 --- a/runtime/vcache/primitive.go +++ /dev/null @@ -1,140 +0,0 @@ -package vcache - -import ( - "fmt" - "net/netip" - "sync" - - "github.com/brimdata/super" - "github.com/brimdata/super/csup" - "github.com/brimdata/super/pkg/field" - "github.com/brimdata/super/scode" - "github.com/brimdata/super/vector" - "github.com/brimdata/super/vector/bitvec" -) - -type primitive struct { - mu sync.Mutex - meta *csup.Primitive - len uint32 - any any -} - -func newPrimitive(cctx *csup.Context, meta *csup.Primitive) *primitive { - return &primitive{meta: meta, len: meta.Len(cctx)} -} - -func (p *primitive) length() uint32 { - return p.len -} - -func (*primitive) unmarshal(*csup.Context, field.Projection) {} - -func (p *primitive) project(loader *loader, projection field.Projection) vector.Any { - if len(projection) > 0 { - return vector.NewMissing(loader.sctx, p.length()) - } - return p.newVector(loader) -} - -func (p *primitive) load(loader *loader) any { - p.mu.Lock() - defer p.mu.Unlock() - if p.any == nil { - p.any = p.loadAnyWithLock(loader) - } - return p.any -} - -func (p *primitive) loadAnyWithLock(loader *loader) any { - bytes := make([]byte, p.meta.Location.MemLength) - if err := p.meta.Location.Read(loader.r, bytes); err != nil { - panic(err) - } - length := p.length() - it := scode.Iter(bytes) - switch p.meta.Typ.(type) { - case *super.TypeOfUint8, *super.TypeOfUint16, *super.TypeOfUint32, *super.TypeOfUint64, *super.TypeEnum: - values := make([]uint64, 0, length) - for range length { - values = append(values, super.DecodeUint(it.Next())) - } - return values - case *super.TypeOfInt8, *super.TypeOfInt16, *super.TypeOfInt32, *super.TypeOfInt64, *super.TypeOfDuration, *super.TypeOfTime: - values := make([]int64, 0, length) - for range length { - values = append(values, super.DecodeInt(it.Next())) - } - return values - case *super.TypeOfFloat16, *super.TypeOfFloat32, *super.TypeOfFloat64: - values := make([]float64, 0, length) - for range length { - values = append(values, super.DecodeFloat(it.Next())) - } - return values - case *super.TypeOfBool: - bits := bitvec.NewFalse(length) - for slot := range length { - if super.DecodeBool(it.Next()) { - bits.Set(slot) - } - } - return bits - case *super.TypeOfBytes, *super.TypeOfString: - var bytes []byte - // First offset is always zero. - offs := make([]uint32, 1, length+1) - for range length { - bytes = append(bytes, it.Next()...) - offs = append(offs, uint32(len(bytes))) - } - return vector.NewBytesTable(offs, bytes) - case *super.TypeOfIP: - values := make([]netip.Addr, 0, length) - for range length { - values = append(values, super.DecodeIP(it.Next())) - } - return values - case *super.TypeOfNet: - values := make([]netip.Prefix, 0, length) - for range length { - values = append(values, super.DecodeNet(it.Next())) - } - return values - case *super.TypeOfNull: - return nil - } - panic(fmt.Errorf("internal error: vcache.loadPrimitive got unknown type %#v", p.meta.Typ)) -} - -func (p *primitive) newVector(loader *loader) vector.Any { - switch typ := p.meta.Typ.(type) { - case *super.TypeOfUint8, *super.TypeOfUint16, *super.TypeOfUint32, *super.TypeOfUint64: - return vector.NewUint(typ, p.load(loader).([]uint64)) - case *super.TypeOfInt8, *super.TypeOfInt16, *super.TypeOfInt32, *super.TypeOfInt64, *super.TypeOfDuration, *super.TypeOfTime: - return vector.NewInt(typ, p.load(loader).([]int64)) - case *super.TypeOfFloat16, *super.TypeOfFloat32, *super.TypeOfFloat64: - return vector.NewFloat(typ, p.load(loader).([]float64)) - case *super.TypeOfBool: - return vector.NewBool(p.load(loader).(bitvec.Bits)) - case *super.TypeOfBytes: - return vector.NewBytes(p.load(loader).(vector.BytesTable)) - case *super.TypeOfString: - return vector.NewString(p.load(loader).(vector.BytesTable)) - case *super.TypeOfIP: - return vector.NewIP(p.load(loader).([]netip.Addr)) - case *super.TypeOfNet: - return vector.NewNet(p.load(loader).([]netip.Prefix)) - case *super.TypeEnum: - // Despite being coded as a primitive, enums have complex types that - // must live in the query context so we can't use the type in the - // CSUP metadata as that context is local to the CSUP object. - t := loader.sctx.LookupTypeEnum(typ.Symbols) - return vector.NewEnum(t, p.load(loader).([]uint64)) - case *super.TypeOfNull: - return vector.NewNull(p.length()) - case *super.TypeOfNone: - return vector.NewNone(p.length()) - } - panic(fmt.Errorf("internal error: vcache.loadPrimitive got unknown type %#v", p.meta.Typ)) -} diff --git a/runtime/vcache/shadow.go b/runtime/vcache/shadow.go index 3758c0cc5a..16ef2c4f96 100644 --- a/runtime/vcache/shadow.go +++ b/runtime/vcache/shadow.go @@ -61,6 +61,8 @@ func newShadow(cctx *csup.Context, id csup.ID) shadow { return newMap(cctx, meta) case *csup.Union: return newUnion(cctx, meta) + case *csup.Enum: + return newEnum(meta, newShadow(cctx, meta.Values)) case *csup.Fusion: return newFusion(cctx, meta) case *csup.Dict: @@ -71,10 +73,18 @@ func newShadow(cctx *csup.Context, id csup.ID) shadow { return newUint(cctx, meta) case *csup.Float: return newFloat(cctx, meta) + case *csup.Bool: + return newBool(meta) case *csup.Bytes: return newBytes(cctx, meta) - case *csup.Primitive: - return newPrimitive(cctx, meta) + case *csup.IP: + return newIP(meta) + case *csup.Net: + return newNet(meta) + case *csup.Null: + return newNull(meta) + case *csup.None: + return newNone(meta) case *csup.TypeValue: return newTypeValue(cctx, meta) case *csup.Const: diff --git a/vector/vbuild/dynamic.go b/vector/vbuild/dynamic.go index 5e9c267d68..4b0a408e51 100644 --- a/vector/vbuild/dynamic.go +++ b/vector/vbuild/dynamic.go @@ -48,15 +48,19 @@ func (d *DynamicBuilder) write(vec vector.Any) uint32 { return i } +func (d *DynamicBuilder) Len() uint32 { + return uint32(len(d.tags)) +} + func (d *DynamicBuilder) Build() vector.Any { - out := d.build() + out := d.BuildDynamic() if len(out.Values) == 1 { return out.Values[0] } return out } -func (d *DynamicBuilder) build() *vector.Dynamic { +func (d *DynamicBuilder) BuildDynamic() *vector.Dynamic { var vecs []vector.Any for _, b := range d.values { vecs = append(vecs, b.Build())