diff --git a/csup/array.go b/csup/array.go index 845051deed..8a64cdb872 100644 --- a/csup/array.go +++ b/csup/array.go @@ -11,35 +11,16 @@ import ( type ArrayEncoder struct { typ super.Type values Encoder - offsets *offsetsEncoder + offsets *Uint32Encoder count uint32 } var _ Encoder = (*ArrayEncoder)(nil) -func NewArrayEncoder(cctx *Context, typ *super.TypeArray) *ArrayEncoder { +func NewArrayEncoder(cctx *Context, vec *vector.Array) *ArrayEncoder { return &ArrayEncoder{ - typ: typ.Type, - values: NewEncoder(cctx, typ.Type), - offsets: newOffsetsEncoder(), - } -} - -func (a *ArrayEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - switch vec := vec.(type) { - case *vector.Array: - a.count += vec.Len() - a.values.Write(vec.Values) - a.offsets.write(vec.Offsets) - case *vector.Set: - a.count += vec.Len() - a.values.Write(vec.Values) - a.offsets.write(vec.Offsets) - default: - panic(vec) + values: NewEncoder(cctx, vec.Values), + offsets: NewUint32Encoder(vec.Offsets), } } @@ -69,12 +50,11 @@ type SetEncoder struct { ArrayEncoder } -func NewSetEncoder(cctx *Context, typ *super.TypeSet) *SetEncoder { +func NewSetEncoder(cctx *Context, vec *vector.Set) *SetEncoder { return &SetEncoder{ ArrayEncoder{ - typ: typ.Type, - values: NewEncoder(cctx, typ.Type), - offsets: newOffsetsEncoder(), + values: NewEncoder(cctx, vec.Values), + offsets: &Uint32Encoder{vals: vec.Offsets}, }, } } diff --git a/csup/bool.go b/csup/bool.go new file mode 100644 index 0000000000..8b0377bd9b --- /dev/null +++ b/csup/bool.go @@ -0,0 +1,56 @@ +package csup + +import ( + "io" + + "github.com/brimdata/super/pkg/byteconv" + "github.com/brimdata/super/vector" + "github.com/brimdata/super/vector/bitvec" + "golang.org/x/sync/errgroup" +) + +type BoolEncoder struct { + bits bitvec.Bits + + // created on Encode + out []byte + fmt uint8 + bytesLen uint64 +} + +func NewBoolEncoder(vec *vector.Bool) Encoder { + return &BoolEncoder{bits: vec.Bits} +} + +func (b *BoolEncoder) Encode(group *errgroup.Group) { + group.Go(func() error { + bytes := byteconv.ReinterpretSlice[byte](b.bits.GetBits()) + f, out, err := compressBuffer(bytes) + if err != nil { + return err + } + b.fmt = f + b.out = out + b.bytesLen = uint64(len(bytes)) + return nil + }) +} + +func (b *BoolEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + loc := Segment{ + Offset: off, + MemLength: uint64(len(b.out)), + Length: b.bytesLen, + CompressionFormat: b.fmt, + } + off += loc.Length + return off, cctx.enter(&Bool{ + Location: loc, + Count: b.bits.Len(), + }) +} + +func (b *BoolEncoder) Emit(w io.Writer) error { + _, err := w.Write(b.out) + return err +} diff --git a/csup/bytes.go b/csup/bytes.go index 456690a776..45ddd88b15 100644 --- a/csup/bytes.go +++ b/csup/bytes.go @@ -12,72 +12,121 @@ import ( type BytesEncoder struct { typ super.Type + bytes *BytesTableEncoder min, max []byte - bytes []byte - offsets *offsetsEncoder - - // These values are used for the Encode pass. - bytesFmt uint8 - bytesOut []byte - bytesLen uint64 } -func NewBytesEncoder(typ super.Type) *BytesEncoder { +func NewBytesEncoder(typ super.Type, table vector.BytesTable) *BytesEncoder { return &BytesEncoder{ - typ: typ, - offsets: newOffsetsEncoder(), + typ: typ, + bytes: &BytesTableEncoder{table: table}, } } -func (b *BytesEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - switch vec := vec.(type) { - case *vector.Bytes: - b.writeTable(vec.Table()) - case *vector.String: - b.writeTable(vec.Table()) - default: - panic(vec) - } +func (b *BytesEncoder) Encode(group *errgroup.Group) { + b.bytes.Encode(group) + group.Go(func() error { + table := b.bytes.table + if table.Len() == 0 { + return nil + } + b.min, b.max = table.Bytes(0), table.Bytes(0) + for i := range table.Len() { + v := table.Bytes(i) + if bytes.Compare(v, b.min) < 0 { + b.min = v + } + if bytes.Compare(v, b.max) > 0 { + b.max = v + } + } + return nil + }) } -func (b *BytesEncoder) writeTable(table vector.BytesTable) { - if len(b.bytes) == 0 { - val := table.Bytes(0) - b.min = append(b.min[:0], val...) - b.max = append(b.max[:0], val...) - } - for slot := range table.Len() { - val := table.Bytes(slot) - if bytes.Compare(val, b.min) < 0 { - b.min = append(b.min[:0], val...) +func (b *BytesEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + off, bytesLoc, offsLoc := b.bytes.Segment(off) + return off, cctx.enter(&Bytes{ + Typ: b.typ, + Bytes: bytesLoc, + Offsets: offsLoc, + Min: b.min, + Max: b.max, + Count: b.bytes.table.Len(), + }) +} + +func (b *BytesEncoder) Emit(w io.Writer) error { + return b.bytes.Emit(w) +} + +func maybeStringBytesDict(typ super.Type, table vector.BytesTable) vector.Any { + flat := func(table vector.BytesTable) vector.Any { + if typ.ID() == super.IDString { + return vector.NewString(table) } - if bytes.Compare(val, b.max) > 0 { - b.max = append(b.max[:0], val...) + return vector.NewBytes(table) + } + var counts []uint32 + m := make(map[string]byte) + index := make([]byte, table.Len()) + out := vector.NewBytesTableEmpty(0) + for i := range table.Len() { + tag, ok := m[string(table.Bytes(i))] + if !ok { + if len(counts) > math.MaxUint8 { + return flat(table) + } + tag = byte(len(counts)) + b := table.Bytes(i) + m[string(b)] = tag + counts = append(counts, 0) + out.Append(b) } + index[i] = tag + counts[tag]++ } - b.bytes = append(b.bytes, table.RawBytes()...) - b.offsets.write(table.RawOffsets()) + if !isValidDict(int(table.Len()), int(out.Len())) { + return flat(table) + } + vec := flat(out) + if vec.Len() == 1 { + return vector.NewConst(vec, table.Len()) + } + return vector.NewDict(vec, index, counts) } -func (b *BytesEncoder) Encode(group *errgroup.Group) { +type BytesTableEncoder struct { + table vector.BytesTable + + // These values are used for the Encode pass. + bytesFmt uint8 + bytesOut []byte + bytesLen uint64 + offsets *Uint32Encoder +} + +func NewBytesTableEncoder(table vector.BytesTable) *BytesTableEncoder { + return &BytesTableEncoder{table: table} +} + +func (b *BytesTableEncoder) Encode(group *errgroup.Group) { group.Go(func() error { - fmt, out, err := compressBuffer(b.bytes) + bytes := b.table.RawBytes() + fmt, out, err := compressBuffer(bytes) if err != nil { return err } b.bytesFmt = fmt b.bytesOut = out - b.bytesLen = uint64(len(b.bytes)) - b.bytes = nil // send to GC + b.bytesLen = uint64(len(bytes)) return nil }) + b.offsets = NewUint32Encoder(b.table.RawOffsets()) b.offsets.Encode(group) } -func (b *BytesEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { +func (b *BytesTableEncoder) Segment(off uint64) (uint64, Segment, Segment) { bytesLoc := Segment{ Offset: off, Length: uint64(len(b.bytesOut)), @@ -85,17 +134,10 @@ func (b *BytesEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { CompressionFormat: b.bytesFmt, } off, offsLoc := b.offsets.Segment(off + bytesLoc.Length) - return off, cctx.enter(&Bytes{ - Typ: b.typ, - Bytes: bytesLoc, - Offsets: offsLoc, - Min: b.min, - Max: b.max, - Count: uint32(len(b.offsets.vals) - 1), - }) + return off, bytesLoc, offsLoc } -func (b *BytesEncoder) Emit(w io.Writer) error { +func (b *BytesTableEncoder) Emit(w io.Writer) error { if len(b.bytesOut) > 0 { if _, err := w.Write(b.bytesOut); err != nil { return err @@ -103,39 +145,3 @@ func (b *BytesEncoder) Emit(w io.Writer) error { } return b.offsets.Emit(w) } - -func (b *BytesEncoder) value(slot uint32) []byte { - return b.bytes[b.offsets.vals[slot]:b.offsets.vals[slot+1]] -} - -func (b *BytesEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - if len(b.bytes) == 0 { - return nil, nil, nil - } - m := make(map[string]byte) - var counts []uint32 - index := make([]byte, len(b.offsets.vals)-1) - table := vector.NewBytesTableEmpty(256) - for k := range uint32(len(index)) { - tag, ok := m[string(b.value(k))] - if !ok { - tag = byte(len(counts)) - v := b.value(k) - m[string(v)] = tag - table.Append(v) - counts = append(counts, 0) - if len(counts) > math.MaxUint8 { - return nil, nil, nil - } - } - index[k] = tag - counts[tag]++ - } - encoder := NewBytesEncoder(b.typ) - encoder.Write(vector.NewBytes(table)) - return encoder, index, counts -} - -func (b *BytesEncoder) ConstValue() super.Value { - return super.NewValue(b.typ, b.value(0)) -} diff --git a/csup/const.go b/csup/const.go new file mode 100644 index 0000000000..f0ebe22c7c --- /dev/null +++ b/csup/const.go @@ -0,0 +1,21 @@ +package csup + +import ( + "io" + + "github.com/brimdata/super" + "golang.org/x/sync/errgroup" +) + +type ConstEncoder struct { + val super.Value + len uint32 +} + +func (*ConstEncoder) Encode(group *errgroup.Group) {} + +func (c *ConstEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + return off, cctx.enter(&Const{c.val, c.len}) +} + +func (*ConstEncoder) Emit(w io.Writer) error { return nil } diff --git a/csup/dict.go b/csup/dict.go index 6ea3ba1e89..25170fecf6 100644 --- a/csup/dict.go +++ b/csup/dict.go @@ -3,70 +3,23 @@ package csup import ( "io" - "github.com/brimdata/super" "golang.org/x/sync/errgroup" ) type DictEncoder struct { - PrimitiveEncoder - typ super.Type - - // These fields are derived after Encode is called. + values Encoder counts *Uint32Encoder index []byte - const_ *Const -} - -func NewDictEncoder(typ super.Type, values PrimitiveEncoder) Encoder { - if id := typ.ID(); id == super.IDUint8 || id == super.IDInt8 || id == super.IDBool { - return values - } - return &DictEncoder{ - typ: typ, - PrimitiveEncoder: values, - } } func (d *DictEncoder) Encode(group *errgroup.Group) { - group.Go(func() error { - entries, index, counts := d.Dict() - if entries == nil { - d.PrimitiveEncoder.Encode(group) - return nil - } - if len(counts) == 1 { - d.const_ = &Const{ - Value: d.ConstValue(), - Count: uint32(len(index)), - } - return nil - } - if !isValidDict(len(index), len(counts)) { - d.PrimitiveEncoder.Encode(group) - return nil - } - d.index = index - d.PrimitiveEncoder = entries - d.counts = &Uint32Encoder{vals: counts} - d.PrimitiveEncoder.Encode(group) - d.counts.Encode(group) - return nil - }) -} - -func isValidDict(len, card int) bool { - return card >= 1 && card < len + d.values.Encode(group) + d.counts.Encode(group) } func (d *DictEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { - if d.const_ != nil { - return off, cctx.enter(d.const_) - } - if d.counts == nil { - return d.PrimitiveEncoder.Metadata(cctx, off) - } meta := &Dict{Length: uint32(len(d.index))} - off, meta.Values = d.PrimitiveEncoder.Metadata(cctx, off) + off, meta.Values = d.values.Metadata(cctx, off) off, meta.Counts = d.counts.Segment(off) len := uint64(len(d.index)) meta.Index = Segment{ @@ -78,15 +31,9 @@ func (d *DictEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { } func (d *DictEncoder) Emit(w io.Writer) error { - if d.const_ != nil { - return nil - } - if err := d.PrimitiveEncoder.Emit(w); err != nil { + if err := d.values.Emit(w); err != nil { return err } - if d.counts == nil { - return nil - } if err := d.counts.Emit(w); err != nil { return err } diff --git a/csup/dynamic.go b/csup/dynamic.go index dd323952ce..7578b3e365 100644 --- a/csup/dynamic.go +++ b/csup/dynamic.go @@ -3,81 +3,35 @@ package csup import ( "io" - "github.com/brimdata/super" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) type DynamicEncoder struct { cctx *Context - tags []uint32 - tagEnc *Uint32Encoder + tags *Uint32Encoder values []Encoder - which map[super.Type]uint32 len uint32 } -func NewDynamicEncoder() *DynamicEncoder { - return &DynamicEncoder{ - cctx: NewContext(), - which: make(map[super.Type]uint32), - } -} - -// The dynamic encoder self-organizes around the types that are -// written to it. No need to define the schema up front! -// We track the types seen first-come, first-served and the -// CSUP metadata structure follows accordingly. -func (d *DynamicEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - if dynamic, ok := vec.(*vector.Dynamic); ok { - d.appendDynamic(dynamic) - } else { - d.appendVec(vec) - } -} - -func (d *DynamicEncoder) appendDynamic(vec *vector.Dynamic) { - var tagmap []uint32 // input tags to local tags - for _, vec := range vec.Values { - tagmap = append(tagmap, d.lookupType(vec.Type())) +func NewDynamicEncoder(vec *vector.Dynamic) *DynamicEncoder { + cctx := NewContext() + values := make([]Encoder, len(vec.Values)) + for i, val := range vec.Values { + values[i] = NewEncoder(cctx, val) } - for _, intag := range vec.Tags { - d.tags = append(d.tags, tagmap[intag]) - } - for intag, vec := range vec.Values { - d.values[tagmap[intag]].Write(vec) - } - d.len += vec.Len() -} - -func (d *DynamicEncoder) appendVec(vec vector.Any) { - tag := d.lookupType(vec.Type()) - for range vec.Len() { - //XXX there's a better way, but let's get this working - d.tags = append(d.tags, tag) - } - d.values[tag].Write(vec) - d.len += vec.Len() -} - -func (d *DynamicEncoder) lookupType(typ super.Type) uint32 { - tag, ok := d.which[typ] - if !ok { - tag = uint32(len(d.values)) - d.values = append(d.values, NewEncoder(d.cctx, typ)) - d.which[typ] = tag + return &DynamicEncoder{ + cctx: cctx, + tags: NewUint32Encoder(vec.Tags), + values: values, + len: vec.Len(), } - return tag } func (d *DynamicEncoder) Encode() (ID, uint64, error) { var group errgroup.Group - d.tagEnc = &Uint32Encoder{vals: d.tags} if len(d.values) > 1 { - d.tagEnc.Encode(&group) + d.tags.Encode(&group) } for _, val := range d.values { val.Encode(&group) @@ -90,7 +44,7 @@ func (d *DynamicEncoder) Encode() (ID, uint64, error) { return id, off, nil } values := make([]ID, 0, len(d.values)) - off, tags := d.tagEnc.Segment(0) + off, tags := d.tags.Segment(0) for _, val := range d.values { var id ID off, id = val.Metadata(d.cctx, off) @@ -105,7 +59,7 @@ func (d *DynamicEncoder) Encode() (ID, uint64, error) { func (d *DynamicEncoder) Emit(w io.Writer) error { if len(d.values) > 1 { - if err := d.tagEnc.Emit(w); err != nil { + if err := d.tags.Emit(w); err != nil { return err } } diff --git a/csup/encoder.go b/csup/encoder.go index 5d2ca700aa..53e21c7350 100644 --- a/csup/encoder.go +++ b/csup/encoder.go @@ -3,15 +3,16 @@ package csup import ( "fmt" "io" + "math" + "net/netip" "github.com/brimdata/super" + "github.com/brimdata/super/scode" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) type Encoder interface { - // Write collects up values to be encoded into memory. - Write(vector.Any) // Encode encodes all in-memory vector data into its storage-ready serialized format. // Vectors may be encoded concurrently and errgroup.Group is used to sync // and return errors. @@ -28,57 +29,155 @@ type Encoder interface { Emit(w io.Writer) error } -type PrimitiveEncoder interface { - Encoder - Dict() (PrimitiveEncoder, []byte, []uint32) - ConstValue() super.Value +func NewEncoder(cctx *Context, vec vector.Any) Encoder { + switch vec := vec.(type) { + case *vector.Record: + return NewRecordEncoder(cctx, vec) + case *vector.Array: + return NewArrayEncoder(cctx, vec) + case *vector.Set: + return NewSetEncoder(cctx, vec) + case *vector.Map: + return NewMapEncoder(cctx, vec) + case *vector.Union: + return NewUnionEncoder(cctx, vec) + case *vector.Enum: + return NewEnumEncoder(vec) + case *vector.Error: + return &ErrorEncoder{NewEncoder(cctx, vec.Vals)} + case *vector.Named: + return NewNamedEncoder(cctx, vec) + case *vector.Fusion: + return NewFusionEncoder(cctx, vec) + default: + return NewPrimitiveEncoder(cctx, vec, true) + } } -func NewEncoder(cctx *Context, typ super.Type) Encoder { - switch typ := typ.(type) { - case *super.TypeNamed: - return &NamedEncoder{cctx: cctx, typ: typ} - case *super.TypeError: - return &ErrorEncoder{NewEncoder(cctx, typ.Type)} - case *super.TypeRecord: - return NewRecordEncoder(cctx, typ) - case *super.TypeArray: - return NewArrayEncoder(cctx, typ) - case *super.TypeSet: - // Sets encode the same way as arrays but behave - // differently semantically, and we don't care here. - return NewSetEncoder(cctx, typ) - case *super.TypeMap: - return NewMapEncoder(cctx, typ) - case *super.TypeUnion: - return NewUnionEncoder(cctx, typ) - case *super.TypeFusion: - return NewFusionEncoder(cctx, typ) - case *super.TypeEnum: - return NewPrimitiveEncoder(cctx, typ) - default: - if !super.IsPrimitiveType(typ) { - panic(fmt.Sprintf("unsupported type in CSUP file: %T", typ)) +func NewPrimitiveEncoder(cctx *Context, vec vector.Any, root bool) Encoder { + switch vec := vec.(type) { + case *vector.Dict: + return &DictEncoder{ + counts: NewUint32Encoder(vec.Counts), + values: NewPrimitiveEncoder(cctx, vec.Any, false), + index: vec.Index, + } + case *vector.Const: + return &ConstEncoder{ + val: vector.ValueAt(new(scode.Builder), vec.Any, 0), + len: vec.Len(), + } + case *vector.Uint: + if root { + // XXX This is a potential computationally intensive operation and + // should probable be move to the Encode pass where it can be + // parallelized. + out := maybeConvertToDictOrConst(vec.Values, func(vals []uint64) vector.Any { + return vector.NewUint(vec.Typ, vals) + }) + return NewPrimitiveEncoder(cctx, out, false) + } + return &UintEncoder{typ: vec.Typ, vals: vec.Values} + case *vector.Int: + if root { + out := maybeConvertToDictOrConst(vec.Values, func(vals []int64) vector.Any { + return vector.NewInt(vec.Typ, vals) + }) + return NewPrimitiveEncoder(cctx, out, false) + } + return &IntEncoder{typ: vec.Typ, vals: vec.Values} + case *vector.Float: + if root { + out := maybeConvertToDictOrConst(vec.Values, func(vals []float64) vector.Any { + return vector.NewFloat(vec.Typ, vals) + }) + return NewPrimitiveEncoder(cctx, out, false) + } + return NewFloatEncoder(vec.Typ, vec.Values) + case *vector.Bool: + // XXX can convert all trues and all falses to consts. + return NewBoolEncoder(vec) + case *vector.String: + if root { + out := maybeStringBytesDict(super.TypeString, vec.Table()) + return NewPrimitiveEncoder(cctx, out, false) + } + return NewBytesEncoder(super.TypeString, vec.Table()) + case *vector.Bytes: + if root { + out := maybeStringBytesDict(super.TypeBytes, vec.Table()) + return NewPrimitiveEncoder(cctx, out, false) + } + return NewBytesEncoder(super.TypeBytes, vec.Table()) + case *vector.IP: + if root { + out := maybeConvertToDictOrConst(vec.Values, func(vals []netip.Addr) vector.Any { + return vector.NewIP(vals) + }) + return NewPrimitiveEncoder(cctx, out, false) + } + return NewIPEncoder(vec.Values) + case *vector.Net: + if root { + out := maybeConvertToDictOrConst(vec.Values, func(vals []netip.Prefix) vector.Any { + return vector.NewNet(vals) + }) + return NewPrimitiveEncoder(cctx, out, false) } - return NewDictEncoder(typ, NewPrimitiveEncoder(cctx, typ)) + return NewNetEncoder(vec.Values) + case *vector.TypeValue: + if root { + out := maybeConvertToDictOrConst(vec.Types(), func(vals []super.Type) vector.Any { + return vector.NewTypeValue(vals) + }) + return NewPrimitiveEncoder(cctx, out, false) + } + return NewTypeValueEncoder(cctx, vec) + case *vector.Null: + return NewNullEncoder(vec.Len()) + case *vector.None: + return NewNoneEncoder(vec.Len()) + default: + panic(fmt.Sprintf("unsupported type in CSUP file: %T", vec)) } } -func NewPrimitiveEncoder(cctx *Context, typ super.Type) PrimitiveEncoder { - switch id := typ.ID(); { - case super.IsSigned(id): - return NewIntEncoder(typ) - case super.IsUnsigned(id): - return NewUintEncoder(typ) - case super.IsFloat(id): - return NewFloatEncoder(typ) - case id == super.IDBytes || id == super.IDString: - return NewBytesEncoder(typ) - case id == super.IDType: - return NewTypeValueEncoder(cctx) - default: - return NewScodeEncoder(typ) +func maybeConvertToDictOrConst[E comparable](in []E, fn func([]E) vector.Any) vector.Any { + vals, index, counts := comparableDict(in) + if vals == nil || !isValidDict(len(in), len(vals)) { + return fn(in) + } + flat := fn(vals) + if len(vals) == 1 { + return vector.NewConst(flat, counts[0]) + } + return vector.NewDict(flat, index, counts) +} + +func isValidDict(len, card int) bool { + return card >= 1 && card < len +} + +func comparableDict[T comparable](in []T) ([]T, []byte, []uint32) { + m := make(map[T]byte) + var counts []uint32 + index := make([]byte, len(in)) + var vals []T + for k, v := range in { + tag, ok := m[v] + if !ok { + if len(counts) > math.MaxUint8 { + return nil, nil, nil + } + tag = byte(len(counts)) + m[v] = tag + counts = append(counts, 0) + vals = append(vals, v) + } + index[k] = tag + counts[tag]++ } + return vals, index, counts } type ErrorEncoder struct { @@ -89,7 +188,3 @@ func (e *ErrorEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { off, id := e.Encoder.Metadata(cctx, off) return off, cctx.enter(&Error{id}) } - -func (e *ErrorEncoder) Write(vec vector.Any) { - e.Encoder.Write(vec.(*vector.Error).Vals) -} diff --git a/csup/enum.go b/csup/enum.go new file mode 100644 index 0000000000..857319e6bf --- /dev/null +++ b/csup/enum.go @@ -0,0 +1,23 @@ +package csup + +import ( + "github.com/brimdata/super" + "github.com/brimdata/super/vector" +) + +type EnumEncoder struct { + *UintEncoder + typ *super.TypeEnum +} + +func NewEnumEncoder(vec *vector.Enum) *EnumEncoder { + return &EnumEncoder{ + UintEncoder: &UintEncoder{typ: vec.Uint.Typ, vals: vec.Uint.Values}, + typ: vec.Typ, + } +} + +func (e *EnumEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + off, values := e.UintEncoder.Metadata(cctx, off) + return off, cctx.enter(&Enum{Symbols: e.typ.Symbols, Values: values}) +} diff --git a/csup/field.go b/csup/field.go index 17c94c4e79..313b012e68 100644 --- a/csup/field.go +++ b/csup/field.go @@ -3,20 +3,14 @@ package csup import ( "io" - "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) -// XXX in a forthcoming PR, we will change this to OptionEncoder type FieldEncoder struct { name string values Encoder } -func (f *FieldEncoder) write(vec vector.Any) { - f.values.Write(vec) -} - func (f *FieldEncoder) Metadata(cctx *Context, off uint64) (uint64, Field) { var id ID off, id = f.values.Metadata(cctx, off) diff --git a/csup/float.go b/csup/float.go index 1d6b309bc2..7a8c6bc963 100644 --- a/csup/float.go +++ b/csup/float.go @@ -2,12 +2,10 @@ package csup import ( "io" - "math" "slices" "github.com/brimdata/super" "github.com/brimdata/super/pkg/byteconv" - "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) @@ -19,28 +17,8 @@ type FloatEncoder struct { fmt uint8 } -func NewFloatEncoder(typ super.Type) *FloatEncoder { - return &FloatEncoder{typ: typ} -} - -func (f *FloatEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - fv := vec.(*vector.Float) - if len(f.vals) == 0 { - f.min = fv.Values[0] - f.max = fv.Values[0] - } - for _, v := range fv.Values { - if v < f.min { - f.min = v - } - if v > f.max { - f.max = v - } - } - f.vals = append(f.vals, fv.Values...) +func NewFloatEncoder(typ super.Type, vals []float64) *FloatEncoder { + return &FloatEncoder{typ: typ, vals: vals} } func (f *FloatEncoder) Encode(group *errgroup.Group) { @@ -50,6 +28,12 @@ func (f *FloatEncoder) Encode(group *errgroup.Group) { f.fmt, f.out, err = compressBuffer(bytes) return err }) + if len(f.vals) > 0 { + group.Go(func() error { + f.min, f.max = minMax(f.vals) + return nil + }) + } } func (u *FloatEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { @@ -76,42 +60,3 @@ func (u *FloatEncoder) Emit(w io.Writer) error { } return err } - -func comparableDict[T comparable](in []T) ([]T, []byte, []uint32) { - m := make(map[T]byte) - var counts []uint32 - index := make([]byte, len(in)) - var vals []T - for k, v := range in { - tag, ok := m[v] - if !ok { - tag = byte(len(counts)) - m[v] = tag - counts = append(counts, 0) - vals = append(vals, v) - if len(counts) > math.MaxUint8 { - return nil, nil, nil - } - } - index[k] = tag - counts[tag]++ - } - return vals, index, counts -} - -func (f *FloatEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - vals, index, count := comparableDict(f.vals) - if vals == nil { - return nil, nil, nil - } - return &FloatEncoder{ - typ: f.typ, - vals: vals, - min: f.min, - max: f.max, - }, index, count -} - -func (f *FloatEncoder) ConstValue() super.Value { - return super.NewFloat(f.typ, f.vals[0]) -} diff --git a/csup/fusion.go b/csup/fusion.go index 7be954ee32..c5d6976a14 100644 --- a/csup/fusion.go +++ b/csup/fusion.go @@ -3,38 +3,26 @@ package csup import ( "io" - "github.com/brimdata/super" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) type FusionEncoder struct { - typ *super.TypeFusion values Encoder subtypes Encoder } var _ Encoder = (*FusionEncoder)(nil) -func NewFusionEncoder(cctx *Context, typ *super.TypeFusion) *FusionEncoder { +func NewFusionEncoder(cctx *Context, vec *vector.Fusion) *FusionEncoder { return &FusionEncoder{ - typ: typ, - values: NewEncoder(cctx, typ.Type), + values: NewEncoder(cctx, vec.Values), // Call NewTypeValueEncoder directly because we do not want subtypes // wrapped in a dict / const. - subtypes: NewTypeValueEncoder(cctx), + subtypes: NewTypeValueEncoder(cctx, vec.Subtypes), } } -func (f *FusionEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - fusion := vec.(*vector.Fusion) - f.values.Write(fusion.Values) - f.subtypes.Write(fusion.Subtypes) -} - func (f *FusionEncoder) Emit(w io.Writer) error { if err := f.values.Emit(w); err != nil { return err diff --git a/csup/header.go b/csup/header.go index fd4904597a..68586a726d 100644 --- a/csup/header.go +++ b/csup/header.go @@ -8,7 +8,7 @@ import ( ) const ( - Version = 21 + Version = 22 HeaderSize = 36 MaxMetaSize = 100 * 1024 * 1024 MaxTypeSize = 100 * 1024 * 1024 diff --git a/csup/int.go b/csup/int.go index ad990b4b83..70ea1b4c96 100644 --- a/csup/int.go +++ b/csup/int.go @@ -1,46 +1,30 @@ package csup import ( + "cmp" "io" "github.com/brimdata/super" "github.com/brimdata/super/pkg/byteconv" - "github.com/brimdata/super/vector" "github.com/ronanh/intcomp" "golang.org/x/sync/errgroup" ) type IntEncoder struct { - typ super.Type - vals []int64 - min, max int64 - out []byte -} + typ super.Type + vals []int64 -func NewIntEncoder(typ super.Type) *IntEncoder { - return &IntEncoder{ - typ: typ, - } + // computed after encode is called. + out []byte + min int64 + max int64 } -func (i *IntEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - iv := vec.(*vector.Int) - if len(i.vals) == 0 { - i.min = iv.Values[0] - i.max = iv.Values[0] - } - for _, v := range iv.Values { - if v < i.min { - i.min = v - } - if v > i.max { - i.max = v - } +func NewIntEncoder(typ super.Type, vals []int64) *IntEncoder { + return &IntEncoder{ + typ: typ, + vals: vals, } - i.vals = append(i.vals, iv.Values...) } func (i *IntEncoder) Encode(group *errgroup.Group) { @@ -49,6 +33,12 @@ func (i *IntEncoder) Encode(group *errgroup.Group) { i.out = byteconv.ReinterpretSlice[byte](compressed) return nil }) + if len(i.vals) > 0 { + group.Go(func() error { + i.min, i.max = minMax(i.vals) + return nil + }) + } } func (i *IntEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { @@ -76,23 +66,6 @@ func (i *IntEncoder) Emit(w io.Writer) error { return err } -func (i *IntEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - entries, index, counts := comparableDict(i.vals) - if entries == nil { - return nil, nil, nil - } - return &IntEncoder{ - typ: i.typ, - vals: entries, - min: i.min, - max: i.max, - }, index, counts -} - -func (i *IntEncoder) ConstValue() super.Value { - return super.NewInt(i.typ, i.vals[0]) -} - type UintEncoder struct { typ super.Type vals []uint64 @@ -100,46 +73,41 @@ type UintEncoder struct { out []byte } -func NewUintEncoder(typ super.Type) *UintEncoder { - return &UintEncoder{typ: typ} -} - -func (u *UintEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - uv := vec.(*vector.Uint) - if len(u.vals) == 0 { - u.min = uv.Values[0] - u.max = uv.Values[0] - } - for _, v := range uv.Values { - if v < u.min { - u.min = v - } - if v > u.max { - u.max = v - } - } - u.vals = append(u.vals, uv.Values...) -} - func (u *UintEncoder) Encode(group *errgroup.Group) { group.Go(func() error { compressed := intcomp.CompressUint64(u.vals, nil) u.out = byteconv.ReinterpretSlice[byte](compressed) return nil }) + if len(u.vals) > 0 { + group.Go(func() error { + u.min, u.max = minMax(u.vals) + return nil + }) + } } -func (u *UintEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { +func minMax[T cmp.Ordered](vals []T) (T, T) { + minVal, maxVal := vals[0], vals[0] + for _, v := range vals { + minVal = min(minVal, v) + maxVal = max(maxVal, v) + } + return minVal, maxVal +} + +func (u *UintEncoder) Segment(off uint64) (uint64, Segment) { loc := Segment{ Offset: off, MemLength: uint64(len(u.out)), Length: uint64(len(u.vals)) * 8, CompressionFormat: CompressionFormatNone, } - off += loc.MemLength + return off + loc.MemLength, loc +} + +func (u *UintEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + off, loc := u.Segment(off) return off, cctx.enter(&Uint{ Typ: u.typ, Location: loc, @@ -157,29 +125,16 @@ func (u *UintEncoder) Emit(w io.Writer) error { return err } -func (u *UintEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - entries, index, counts := comparableDict(u.vals) - if entries == nil { - return nil, nil, nil - } - return &UintEncoder{ - typ: u.typ, - vals: entries, - min: u.min, - max: u.max, - }, index, counts -} - -func (u *UintEncoder) ConstValue() super.Value { - return super.NewUint(u.typ, u.vals[0]) -} - type Uint32Encoder struct { vals []uint32 out []byte bytesLen uint64 } +func NewUint32Encoder(vals []uint32) *Uint32Encoder { + return &Uint32Encoder{vals: vals} +} + func (u *Uint32Encoder) Write(v uint32) { u.vals = append(u.vals, v) } @@ -217,25 +172,6 @@ func (u *Uint32Encoder) Segment(off uint64) (uint64, Segment) { } } -type offsetsEncoder struct { - Uint32Encoder -} - -func newOffsetsEncoder() *offsetsEncoder { - return &offsetsEncoder{} -} - -func (o *offsetsEncoder) write(offsets []uint32) { - if len(o.vals) == 0 { - o.vals = offsets - } else { - base := o.vals[len(o.vals)-1] - for _, off := range offsets[1:] { - o.vals = append(o.vals, base+off) - } - } -} - func ReadUint32s(loc Segment, r io.ReaderAt) ([]uint32, error) { buf := make([]byte, loc.MemLength) if err := loc.Read(r, buf); err != nil { diff --git a/csup/ip.go b/csup/ip.go new file mode 100644 index 0000000000..61529acab3 --- /dev/null +++ b/csup/ip.go @@ -0,0 +1,67 @@ +package csup + +import ( + "io" + "net/netip" + + "github.com/brimdata/super/vector" + "golang.org/x/sync/errgroup" +) + +type IPEncoder struct { + vals []netip.Addr + table *BytesTableEncoder + min, max netip.Addr +} + +func NewIPEncoder(vals []netip.Addr) *IPEncoder { + return &IPEncoder{ + vals: vals, + } +} + +func (i *IPEncoder) Encode(group *errgroup.Group) { + group.Go(func() error { + var bytes []byte + offsets := []uint32{0} + for _, ip := range i.vals { + var err error + if bytes, err = ip.AppendBinary(bytes); err != nil { + panic(err) + } + offsets = append(offsets, uint32(len(bytes))) + } + i.table = NewBytesTableEncoder(vector.NewBytesTable(offsets, bytes)) + i.table.Encode(group) + return nil + }) + if len(i.vals) > 0 { + group.Go(func() error { + i.min, i.max = i.vals[0], i.vals[0] + for _, v := range i.vals { + if v.Compare(i.min) < 0 { + i.min = v + } + if v.Compare(i.max) > 0 { + i.max = v + } + } + return nil + }) + } +} + +func (i *IPEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + off, bytesLoc, offsLoc := i.table.Segment(off) + return off, cctx.enter(&IP{ + Bytes: bytesLoc, + Offsets: offsLoc, + Min: i.min, + Max: i.max, + Count: uint32(len(i.vals)), + }) +} + +func (i *IPEncoder) Emit(w io.Writer) error { + return i.table.Emit(w) +} diff --git a/csup/map.go b/csup/map.go index 42f5c312b7..6941998795 100644 --- a/csup/map.go +++ b/csup/map.go @@ -3,7 +3,6 @@ package csup import ( "io" - "github.com/brimdata/super" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) @@ -11,29 +10,18 @@ import ( type MapEncoder struct { keys Encoder values Encoder - offsets *offsetsEncoder + offsets *Uint32Encoder count uint32 } -func NewMapEncoder(cctx *Context, typ *super.TypeMap) *MapEncoder { +func NewMapEncoder(cctx *Context, vec *vector.Map) *MapEncoder { return &MapEncoder{ - keys: NewEncoder(cctx, typ.KeyType), - values: NewEncoder(cctx, typ.ValType), - offsets: newOffsetsEncoder(), + keys: NewEncoder(cctx, vec.Keys), + values: NewEncoder(cctx, vec.Values), + offsets: NewUint32Encoder(vec.Offsets), } } -func (m *MapEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - mapVec := vec.(*vector.Map) - m.count += vec.Len() - m.keys.Write(mapVec.Keys) - m.values.Write(mapVec.Values) - m.offsets.write(mapVec.Offsets) -} - func (m *MapEncoder) Emit(w io.Writer) error { if err := m.offsets.Emit(w); err != nil { return err diff --git a/csup/metadata.go b/csup/metadata.go index 13c8ac5733..a8e6337901 100644 --- a/csup/metadata.go +++ b/csup/metadata.go @@ -1,6 +1,7 @@ package csup import ( + "net/netip" "slices" "github.com/brimdata/super" @@ -91,6 +92,15 @@ func (u *Union) Len(*Context) uint32 { return u.Length } +type Enum struct { + Symbols []string + Values ID +} + +func (e *Enum) Len(cctx *Context) uint32 { + return cctx.Lookup(e.Values).Len(cctx) +} + type Named struct { Name string Values ID @@ -165,6 +175,15 @@ func (f *Float) Len(*Context) uint32 { return f.Count } +type Bool struct { + Location Segment + Count uint32 +} + +func (b *Bool) Len(*Context) uint32 { + return b.Count +} + type Bytes struct { Typ super.Type `super:"Type"` Bytes Segment @@ -191,21 +210,44 @@ func (t *TypeValue) Len(*Context) uint32 { return t.Length } -type Primitive struct { - Typ super.Type `super:"Type"` - Location Segment - MinMax bool - Min super.Value - Max super.Value - Count uint32 +type IP struct { + Bytes Segment + Offsets Segment + Min netip.Addr + Max netip.Addr + Count uint32 +} + +func (n *IP) Len(*Context) uint32 { + return n.Count +} + +type Net struct { + Bytes Segment + Offsets Segment + Min netip.Prefix + Max netip.Prefix + Count uint32 } -func (p *Primitive) Type(*Context, *super.Context) super.Type { - return p.Typ +func (n *Net) Len(*Context) uint32 { + return n.Count +} + +type Null struct { + Count uint32 +} + +func (n *Null) Len(*Context) uint32 { + return n.Count +} + +type None struct { + Count uint32 } -func (p *Primitive) Len(*Context) uint32 { - return p.Count +func (n *None) Len(*Context) uint32 { + return n.Count } type Const struct { @@ -277,8 +319,6 @@ func metadataValue(cctx *Context, sctx *super.Context, b *scode.Builder, id ID, } b.EndContainer() return sctx.MustLookupTypeRecord(fields) - case *Primitive: - return metadataLeaf(sctx, b, m.Min, m.Max) case *Int: return metadataLeaf(sctx, b, super.NewInt(m.Typ, m.Min), super.NewInt(m.Typ, m.Max)) case *Uint: @@ -287,6 +327,10 @@ func metadataValue(cctx *Context, sctx *super.Context, b *scode.Builder, id ID, return metadataLeaf(sctx, b, super.NewFloat(m.Typ, m.Min), super.NewFloat(m.Typ, m.Max)) case *Bytes: return metadataLeaf(sctx, b, super.NewValue(m.Typ, m.Min), super.NewValue(m.Typ, m.Max)) + case *IP: + return metadataLeaf(sctx, b, super.NewIP(m.Min), super.NewIP(m.Max)) + case *Net: + return metadataLeaf(sctx, b, super.NewNet(m.Min), super.NewNet(m.Max)) case *Const: return metadataLeaf(sctx, b, m.Value, m.Value) default: @@ -322,7 +366,6 @@ var Template = []any{ Uint{}, Float{}, Bytes{}, - Primitive{}, TypeValue{}, Named{}, Error{}, @@ -331,4 +374,10 @@ var Template = []any{ Dynamic{}, Fusion{}, Empty{}, + Enum{}, + Bool{}, + IP{}, + Net{}, + Null{}, + None{}, } diff --git a/csup/named.go b/csup/named.go index 879bdc8cc4..9b5a158272 100644 --- a/csup/named.go +++ b/csup/named.go @@ -10,26 +10,15 @@ import ( type NamedEncoder struct { encoder Encoder - cctx *Context typ *super.TypeNamed } -func (n *NamedEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { - if n.encoder == nil { - return off, cctx.enter(&Empty{Type: n.typ}) - } - off, id := n.encoder.Metadata(cctx, off) - return off, cctx.enter(&Named{n.typ.Name, id}) -} - -func (n *NamedEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - if n.encoder == nil { - n.encoder = NewEncoder(n.cctx, n.typ.Type) +func NewNamedEncoder(cctx *Context, named *vector.Named) Encoder { + e := &NamedEncoder{typ: named.Typ} + if _, ok := named.Any.(*vector.Empty); !ok { + e.encoder = NewEncoder(cctx, named.Any) } - n.encoder.Write(vec.(*vector.Named).Any) + return e } func (n *NamedEncoder) Encode(group *errgroup.Group) { @@ -38,6 +27,14 @@ func (n *NamedEncoder) Encode(group *errgroup.Group) { } } +func (n *NamedEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + if n.encoder == nil { + return off, cctx.enter(&Empty{Type: n.typ}) + } + off, id := n.encoder.Metadata(cctx, off) + return off, cctx.enter(&Named{n.typ.Name, id}) +} + func (n *NamedEncoder) Emit(w io.Writer) error { if n.encoder != nil { return n.encoder.Emit(w) diff --git a/csup/net.go b/csup/net.go new file mode 100644 index 0000000000..bdc97a3145 --- /dev/null +++ b/csup/net.go @@ -0,0 +1,67 @@ +package csup + +import ( + "io" + "net/netip" + + "github.com/brimdata/super/vector" + "golang.org/x/sync/errgroup" +) + +type NetEncoder struct { + vals []netip.Prefix + table *BytesTableEncoder + min, max netip.Prefix +} + +func NewNetEncoder(vals []netip.Prefix) *NetEncoder { + return &NetEncoder{ + vals: vals, + } +} + +func (n *NetEncoder) Encode(group *errgroup.Group) { + group.Go(func() error { + var bytes []byte + offsets := []uint32{0} + for _, net := range n.vals { + var err error + if bytes, err = net.AppendBinary(bytes); err != nil { + panic(err) + } + offsets = append(offsets, uint32(len(bytes))) + } + n.table = NewBytesTableEncoder(vector.NewBytesTable(offsets, bytes)) + n.table.Encode(group) + return nil + }) + if len(n.vals) > 0 { + group.Go(func() error { + n.min, n.max = n.vals[0], n.vals[0] + for _, v := range n.vals { + if v.Compare(n.min) < 0 { + n.min = v + } + if v.Compare(n.max) > 0 { + n.max = v + } + } + return nil + }) + } +} + +func (n *NetEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + off, bytesLoc, offsLoc := n.table.Segment(off) + return off, cctx.enter(&Net{ + Bytes: bytesLoc, + Offsets: offsLoc, + Min: n.min, + Max: n.max, + Count: uint32(len(n.vals)), + }) +} + +func (n *NetEncoder) Emit(w io.Writer) error { + return n.table.Emit(w) +} diff --git a/csup/none.go b/csup/none.go new file mode 100644 index 0000000000..89d769eb07 --- /dev/null +++ b/csup/none.go @@ -0,0 +1,22 @@ +package csup + +import ( + "io" + + "golang.org/x/sync/errgroup" +) + +type NoneEncoder struct { + len uint32 +} + +func NewNoneEncoder(len uint32) *NoneEncoder { + return &NoneEncoder{len} +} + +func (n *NoneEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + return off, cctx.enter(&None{n.len}) +} + +func (*NoneEncoder) Encode(group *errgroup.Group) {} +func (*NoneEncoder) Emit(io.Writer) error { return nil } diff --git a/csup/null.go b/csup/null.go new file mode 100644 index 0000000000..e5d532371c --- /dev/null +++ b/csup/null.go @@ -0,0 +1,22 @@ +package csup + +import ( + "io" + + "golang.org/x/sync/errgroup" +) + +type NullEncoder struct { + len uint32 +} + +func NewNullEncoder(len uint32) *NullEncoder { + return &NullEncoder{len} +} + +func (n *NullEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { + return off, cctx.enter(&Null{n.len}) +} + +func (*NullEncoder) Encode(*errgroup.Group) {} +func (*NullEncoder) Emit(io.Writer) error { return nil } diff --git a/csup/record.go b/csup/record.go index ea46c21176..20ca6c30f1 100644 --- a/csup/record.go +++ b/csup/record.go @@ -3,7 +3,6 @@ package csup import ( "io" - "github.com/brimdata/super" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) @@ -15,26 +14,15 @@ type RecordEncoder struct { var _ Encoder = (*RecordEncoder)(nil) -func NewRecordEncoder(cctx *Context, typ *super.TypeRecord) *RecordEncoder { - fields := make([]*FieldEncoder, 0, len(typ.Fields)) - for _, f := range typ.Fields { +func NewRecordEncoder(cctx *Context, vec *vector.Record) *RecordEncoder { + fields := make([]*FieldEncoder, 0, len(vec.Fields)) + for i, f := range vec.Fields { fields = append(fields, &FieldEncoder{ - name: f.Name, - values: NewEncoder(cctx, f.Type), + name: vec.Typ.Fields[i].Name, + values: NewEncoder(cctx, f), }) } - return &RecordEncoder{fields: fields} -} - -func (r *RecordEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - rec := vec.(*vector.Record) - r.count += rec.Len() - for k, f := range r.fields { - f.write(rec.Fields[k]) - } + return &RecordEncoder{fields: fields, count: vec.Len()} } func (r *RecordEncoder) Encode(group *errgroup.Group) { diff --git a/csup/scode.go b/csup/scode.go deleted file mode 100644 index 99db5a6240..0000000000 --- a/csup/scode.go +++ /dev/null @@ -1,143 +0,0 @@ -package csup - -import ( - "io" - "math" - - "github.com/brimdata/super" - "github.com/brimdata/super/order" - "github.com/brimdata/super/runtime/sam/expr" - "github.com/brimdata/super/scode" - "github.com/brimdata/super/vector" - "golang.org/x/sync/errgroup" -) - -const MaxDictSize = 256 - -type ScodeEncoder struct { - typ super.Type - bytes scode.Bytes - cmp expr.CompareFn - min super.Value - max super.Value - minmax bool - count uint32 - - // fields used after Encode is called - bytesLen uint64 - format uint8 - out []byte -} - -func NewScodeEncoder(typ super.Type) *ScodeEncoder { - return &ScodeEncoder{ - typ: typ, - cmp: expr.NewValueCompareFn(order.Asc, order.NullsFirst), - min: super.Null, - max: super.Null, - } -} - -// XXX TBD: change all the scode primitives to be native and get rid of -// this slow path here. -func (p *ScodeEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - var b scode.Builder - for slot := range vec.Len() { - b.Reset() - vec.Serialize(&b, slot) - body := b.Bytes().Body() - p.update(body) - p.bytes = scode.Append(p.bytes, body) - } -} - -func (p *ScodeEncoder) WriteBytes(bytes scode.Bytes) { - p.update(bytes) - p.bytes = scode.Append(p.bytes, bytes) -} - -func (p *ScodeEncoder) update(body scode.Bytes) { - p.count++ - val := super.NewValue(p.typ, body) - if !p.minmax || p.cmp(val, p.min) < 0 { - p.min = val.Copy() - } - if !p.minmax || p.cmp(val, p.max) > 0 { - p.max = val.Copy() - } - p.minmax = true -} - -func (p *ScodeEncoder) Encode(group *errgroup.Group) { - group.Go(func() error { - fmt, out, err := compressBuffer(p.bytes) - if err != nil { - return err - } - p.format = fmt - p.out = out - p.bytesLen = uint64(len(p.bytes)) - p.bytes = nil // send to GC - return nil - }) -} - -func (p *ScodeEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { - loc := Segment{ - Offset: off, - Length: uint64(len(p.out)), - MemLength: p.bytesLen, - CompressionFormat: p.format, - } - off += uint64(len(p.out)) - return off, cctx.enter(&Primitive{ - Typ: p.typ, - Location: loc, - Count: p.count, - MinMax: p.minmax, - Min: p.min, - Max: p.max, - }) -} - -func (p *ScodeEncoder) Emit(w io.Writer) error { - var err error - if len(p.out) > 0 { - _, err = w.Write(p.out) - } - return err -} - -func (p *ScodeEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - m := make(map[string]byte) - var counts []uint32 - index := make([]byte, p.count) - entries := NewScodeEncoder(p.typ) - var k uint32 - it := p.bytes.Iter() - for !it.Done() { - v := it.Next() - tag, ok := m[string(v)] - if !ok { - tag = byte(len(counts)) - m[string(v)] = tag - counts = append(counts, 0) - entries.WriteBytes(v) - if len(counts) > math.MaxUint8 { - return nil, nil, nil - } - } - index[k] = tag - counts[tag]++ - k++ - } - return entries, index, counts -} - -func (p *ScodeEncoder) ConstValue() super.Value { - it := p.bytes.Iter() - return super.NewValue(p.typ, it.Next()) -} diff --git a/csup/type.go b/csup/type.go index b95ec94321..648eafd761 100644 --- a/csup/type.go +++ b/csup/type.go @@ -10,23 +10,17 @@ import ( type TypeValueEncoder struct { cctx *Context - ids []uint32 encoder *Uint32Encoder } -func NewTypeValueEncoder(cctx *Context) PrimitiveEncoder { - return &TypeValueEncoder{cctx: cctx} -} - -func (t *TypeValueEncoder) Write(vec vector.Any) { - types := vec.(*vector.TypeValue) - defs, ids := types.TypeIDs() - merger := super.NewTypeDefsMerger(t.cctx.TypeDefs(), defs) +func NewTypeValueEncoder(cctx *Context, vec *vector.TypeValue) Encoder { + defs, ids := vec.TypeIDs() + merger := super.NewTypeDefsMerger(cctx.TypeDefs(), defs) mapped := make([]uint32, 0, len(ids)) for _, localID := range ids { mapped = append(mapped, merger.LookupID(localID)) } - t.ids = append(t.ids, mapped...) + return &TypeValueEncoder{cctx: cctx, encoder: NewUint32Encoder(mapped)} } func (t *TypeValueEncoder) Emit(w io.Writer) error { @@ -34,7 +28,6 @@ func (t *TypeValueEncoder) Emit(w io.Writer) error { } func (t *TypeValueEncoder) Encode(group *errgroup.Group) { - t.encoder = &Uint32Encoder{vals: t.ids} t.encoder.Encode(group) } @@ -44,18 +37,3 @@ func (t *TypeValueEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { Location: loc, }) } - -func (t *TypeValueEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) { - entries, index, counts := comparableDict(t.ids) - if entries == nil { - return nil, nil, nil - } - return &TypeValueEncoder{ - ids: entries, - }, index, counts -} - -func (t *TypeValueEncoder) ConstValue() super.Value { - typ := super.NewTypeDefsMapper(t.cctx.local, t.cctx.typedefs).LookupType(t.ids[0]) - return super.NewValue(super.TypeType, super.EncodeTypeValue(typ)) -} diff --git a/csup/union.go b/csup/union.go index bd63bf0224..999430e0cb 100644 --- a/csup/union.go +++ b/csup/union.go @@ -3,119 +3,38 @@ package csup import ( "io" - "github.com/brimdata/super" - "github.com/brimdata/super/sup" "github.com/brimdata/super/vector" "golang.org/x/sync/errgroup" ) type UnionEncoder struct { - typ *super.TypeUnion - values []Encoder - count uint32 - tags []uint32 - tagEnc *Uint32Encoder + values []Encoder + rleOrTags *Uint32Encoder + count uint32 } var _ Encoder = (*UnionEncoder)(nil) -func NewUnionEncoder(cctx *Context, typ *super.TypeUnion) *UnionEncoder { +func NewUnionEncoder(cctx *Context, vec *vector.Union) *UnionEncoder { var values []Encoder - for _, typ := range typ.Types { - values = append(values, NewEncoder(cctx, typ)) + for _, val := range vec.Values() { + values = append(values, NewEncoder(cctx, val)) } - return &UnionEncoder{ - typ: typ, - values: values, - } -} - -func (u *UnionEncoder) Write(vec vector.Any) { - if vec.Len() == 0 { - return - } - union := vec.(*vector.Union) - u.count += vec.Len() - // Union vectors do not require that the values slice has - // alignment with the types in the union type. Thus, we can - // have vectors land here that have different orderings for - // the same union type. We could optimize this by adopting the - // order of the first vector and recomputing the tags for each - // subsequent incoming vector so that we don't have to rewrite - // the tags of the first vector, but for now, we just map - // everything to canonical order of the union types. - var vecs []vector.Any - if len(union.Typ.Types) == 2 { - // Code tags as run lengths. - rle := union.TagsRLE() - if rle == nil { - // Encoder returns nil for all tag 0 - rle = []uint32{0, vec.Len()} - } - // RLEs have the nice property that you can just concatenate them - // to append two vectors. - vecs, rle = reorderRLE(union, rle) - u.tags = append(u.tags, rle...) - } else { - var tags []uint32 - vecs, tags = reorder(union) - u.tags = append(u.tags, tags...) - } - for k, vec := range vecs { - if vec != nil && vec.Len() != 0 { - u.values[k].Write(vec) - } - } -} - -func reorderRLE(union *vector.Union, rle []uint32) ([]vector.Any, []uint32) { - vecs := union.Values() - if canonOrder(union.Typ, vecs) { - return vecs, rle - } - if rle[0] == 0 { - rle = rle[1:] + var rleOrTags []uint32 + if len(vec.Typ.Types) == 2 { + rleOrTags = vec.TagsRLE() } else { - rle = append([]uint32{0}, rle...) - } - return []vector.Any{vecs[1], vecs[0]}, rle -} - -func reorder(union *vector.Union) ([]vector.Any, []uint32) { - vecs := union.Values() - if canonOrder(union.Typ, vecs) { - return vecs, union.Tags() - } - tagmap := make([]uint32, len(vecs)) - for inTag, vec := range vecs { - localTag := union.Typ.TagOf(vec.Type()) - if localTag < 0 { - panic(sup.String(vec.Type())) - } - tagmap[inTag] = uint32(localTag) + rleOrTags = vec.Tags() } - tags := make([]uint32, len(union.Tags())) - for k, intag := range union.Tags() { - tags[k] = tagmap[intag] - } - vals := make([]vector.Any, len(union.Typ.Types)) - for inTag, v := range union.Values() { - vals[tagmap[inTag]] = v - } - return vals, tags -} - -func canonOrder(typ *super.TypeUnion, vecs []vector.Any) bool { - for inTag, vec := range vecs { - if inTag != typ.TagOf(vec.Type()) { - return false - } + return &UnionEncoder{ + values: values, + rleOrTags: NewUint32Encoder(rleOrTags), + count: vec.Len(), } - return true } func (u *UnionEncoder) Emit(w io.Writer) error { - if err := u.tagEnc.Emit(w); err != nil { + if err := u.rleOrTags.Emit(w); err != nil { return err } for _, value := range u.values { @@ -127,15 +46,14 @@ func (u *UnionEncoder) Emit(w io.Writer) error { } func (u *UnionEncoder) Encode(group *errgroup.Group) { - u.tagEnc = &Uint32Encoder{vals: u.tags} - u.tagEnc.Encode(group) + u.rleOrTags.Encode(group) for _, value := range u.values { value.Encode(group) } } func (u *UnionEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) { - off, tags := u.tagEnc.Segment(off) + off, tags := u.rleOrTags.Segment(off) values := make([]ID, 0, len(u.values)) for _, val := range u.values { var id ID diff --git a/csup/writer.go b/csup/writer.go index 53f52f09cc..11357bf64d 100644 --- a/csup/writer.go +++ b/csup/writer.go @@ -10,6 +10,7 @@ import ( "github.com/brimdata/super/sio/bsupio" "github.com/brimdata/super/sup" "github.com/brimdata/super/vector" + "github.com/brimdata/super/vector/vbuild" "github.com/brimdata/super/vector/vio" ) @@ -19,7 +20,7 @@ var maxObjectSize uint32 = 120_000 // CSUP object from a stream of vector.Any. type Serializer struct { writer io.WriteCloser - dynamic *DynamicEncoder + dynamic *vbuild.DynamicBuilder } var _ vio.Pusher = (*Serializer)(nil) @@ -27,7 +28,7 @@ var _ vio.Pusher = (*Serializer)(nil) func NewSerializer(w io.WriteCloser) *Serializer { return &Serializer{ writer: w, - dynamic: NewDynamicEncoder(), + dynamic: vbuild.NewDynamicBuilder(), } } @@ -42,7 +43,7 @@ func (w *Serializer) Close() error { func (w *Serializer) Push(vec vector.Any) error { if vec.Len() != 0 { w.dynamic.Write(vec) - if w.dynamic.len >= maxObjectSize { + if w.dynamic.Len() >= maxObjectSize { return w.finalizeObject() } } @@ -50,7 +51,8 @@ func (w *Serializer) Push(vec vector.Any) error { } func (w *Serializer) finalizeObject() error { - root, dataSize, err := w.dynamic.Encode() + enc := NewDynamicEncoder(w.dynamic.BuildDynamic()) + root, dataSize, err := enc.Encode() if err != nil { return fmt.Errorf("system error: could not encode CSUP metadata: %w", err) } @@ -59,7 +61,7 @@ func (w *Serializer) finalizeObject() error { var metaBuf bytes.Buffer zw := bsupio.NewWriter(sio.NopCloser(&metaBuf)) // First, we write the root segmap of the vector of integer type IDs. - cctx := w.dynamic.cctx + cctx := enc.cctx m := sup.NewBSUPMarshalerWithContext(cctx.local) m.Decorate(sup.StyleSimple) for id := range len(cctx.metas) { @@ -88,11 +90,11 @@ func (w *Serializer) finalizeObject() error { return fmt.Errorf("system error: could not write CSUP metadata section: %w", err) } // Data section - if err := w.dynamic.Emit(w.writer); err != nil { + if err := enc.Emit(w.writer); err != nil { return fmt.Errorf("system error: could not write CSUP data section: %w", err) } // Set new dynamic so we can write the next object. - w.dynamic = NewDynamicEncoder() + w.dynamic = vbuild.NewDynamicBuilder() return nil } diff --git a/csup/ztests/const.yaml b/csup/ztests/const.yaml index 5cee716909..426c3dda99 100644 --- a/csup/ztests/const.yaml +++ b/csup/ztests/const.yaml @@ -12,6 +12,6 @@ inputs: outputs: - name: stdout data: | - {Version:21::uint32,MetaSize:45::uint64,TypeSize:6::uint64,DataSize:0::uint64,Root:0::uint32} + {Version:22::uint32,MetaSize:45::uint64,TypeSize:6::uint64,DataSize:0::uint64,Root:0::uint32} type Const={Value:fusion(all),Count:uint32} {Value:fusion(0x02,),Count:3}::Const diff --git a/runtime/vcache/bool.go b/runtime/vcache/bool.go new file mode 100644 index 0000000000..a734c860c4 --- /dev/null +++ b/runtime/vcache/bool.go @@ -0,0 +1,49 @@ +package vcache + +import ( + "sync" + + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/byteconv" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" + "github.com/brimdata/super/vector/bitvec" +) + +type bool_ struct { + mu sync.Mutex + meta *csup.Bool + bits *bitvec.Bits +} + +func newBool(meta *csup.Bool) *bool_ { + return &bool_{meta: meta} +} + +func (b *bool_) length() uint32 { + return b.meta.Count +} + +func (*bool_) unmarshal(*csup.Context, field.Projection) {} + +func (b *bool_) project(loader *loader, projection field.Projection) vector.Any { + if len(projection) > 0 { + return vector.NewMissing(loader.sctx, b.length()) + } + return vector.NewBool(b.load(loader)) +} + +func (b *bool_) load(loader *loader) bitvec.Bits { + b.mu.Lock() + defer b.mu.Unlock() + if b.bits != nil { + return *b.bits + } + bytes := make([]byte, b.meta.Location.MemLength) + if err := b.meta.Location.Read(loader.r, bytes); err != nil { + panic(err) + } + bits := bitvec.New(byteconv.ReinterpretSlice[uint64](bytes), b.meta.Count) + b.bits = &bits + return bits +} diff --git a/runtime/vcache/bytes.go b/runtime/vcache/bytes.go index a30c3c2725..80b1dd02ba 100644 --- a/runtime/vcache/bytes.go +++ b/runtime/vcache/bytes.go @@ -47,18 +47,22 @@ func (b *bytes) load(loader *loader) vector.BytesTable { if b.table != nil { return *b.table } - offs, err := csup.ReadUint32s(b.meta.Offsets, loader.r) + table := loadBytesTable(loader, b.meta.Offsets, b.meta.Bytes) + b.table = &table + return table +} + +func loadBytesTable(loader *loader, offsets, bytes csup.Segment) vector.BytesTable { + offs, err := csup.ReadUint32s(offsets, loader.r) if err != nil { panic(err) } if len(offs) == 0 { offs = []uint32{0} } - bytes := make([]byte, b.meta.Bytes.MemLength) - if err := b.meta.Bytes.Read(loader.r, bytes); err != nil { + b := make([]byte, bytes.MemLength) + if err := bytes.Read(loader.r, b); err != nil { panic(err) } - table := vector.NewBytesTable(offs, bytes) - b.table = &table - return table + return vector.NewBytesTable(offs, b) } diff --git a/runtime/vcache/enum.go b/runtime/vcache/enum.go new file mode 100644 index 0000000000..311d5b3a92 --- /dev/null +++ b/runtime/vcache/enum.go @@ -0,0 +1,33 @@ +package vcache + +import ( + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" +) + +type enum struct { + meta *csup.Enum + values shadow +} + +func (e *enum) length() uint32 { + return e.values.length() +} + +func newEnum(meta *csup.Enum, values shadow) *enum { + return &enum{ + meta: meta, + values: values, + } +} + +func (e *enum) unmarshal(cctx *csup.Context, projection field.Projection) { + e.values.unmarshal(cctx, projection) +} + +func (e *enum) project(loader *loader, projection field.Projection) vector.Any { + vec := e.values.project(loader, projection).(*vector.Uint) + enum := loader.sctx.LookupTypeEnum(e.meta.Symbols) + return &vector.Enum{Uint: vec, Typ: enum} +} diff --git a/runtime/vcache/ip.go b/runtime/vcache/ip.go new file mode 100644 index 0000000000..aaafb1d034 --- /dev/null +++ b/runtime/vcache/ip.go @@ -0,0 +1,51 @@ +package vcache + +import ( + "net/netip" + "sync" + + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" +) + +type ip struct { + mu sync.Mutex + meta *csup.IP + vals []netip.Addr +} + +func newIP(meta *csup.IP) *ip { + return &ip{meta: meta} +} + +func (i *ip) length() uint32 { + return i.meta.Count +} + +func (*ip) unmarshal(*csup.Context, field.Projection) {} + +func (i *ip) project(loader *loader, projection field.Projection) vector.Any { + if len(projection) > 0 { + return vector.NewMissing(loader.sctx, i.length()) + } + return vector.NewIP(i.load(loader)) +} + +func (i *ip) load(loader *loader) []netip.Addr { + i.mu.Lock() + defer i.mu.Unlock() + if i.vals != nil { + return i.vals + } + i.vals = make([]netip.Addr, i.meta.Count) + table := loadBytesTable(loader, i.meta.Offsets, i.meta.Bytes) + for k := range table.Len() { + var ok bool + if i.vals[k], ok = netip.AddrFromSlice(table.Bytes(k)); !ok { + panic("malformed ip block") + } + + } + return i.vals +} diff --git a/runtime/vcache/net.go b/runtime/vcache/net.go new file mode 100644 index 0000000000..52af85a28e --- /dev/null +++ b/runtime/vcache/net.go @@ -0,0 +1,50 @@ +package vcache + +import ( + "net/netip" + "sync" + + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" +) + +type net struct { + mu sync.Mutex + meta *csup.Net + vals []netip.Prefix +} + +func newNet(meta *csup.Net) *net { + return &net{meta: meta} +} + +func (n *net) length() uint32 { + return n.meta.Count +} + +func (*net) unmarshal(*csup.Context, field.Projection) {} + +func (n *net) project(loader *loader, projection field.Projection) vector.Any { + if len(projection) > 0 { + return vector.NewMissing(loader.sctx, n.length()) + } + return vector.NewNet(n.load(loader)) +} + +func (n *net) load(loader *loader) []netip.Prefix { + n.mu.Lock() + defer n.mu.Unlock() + if n.vals != nil { + return n.vals + } + n.vals = make([]netip.Prefix, n.meta.Count) + table := loadBytesTable(loader, n.meta.Offsets, n.meta.Bytes) + for k := range table.Len() { + if err := n.vals[k].UnmarshalBinary(table.Bytes(k)); err != nil { + panic(err) + } + + } + return n.vals +} diff --git a/runtime/vcache/none.go b/runtime/vcache/none.go new file mode 100644 index 0000000000..513ea4c4db --- /dev/null +++ b/runtime/vcache/none.go @@ -0,0 +1,28 @@ +package vcache + +import ( + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" +) + +type none struct { + meta *csup.None +} + +func newNone(meta *csup.None) *none { + return &none{meta: meta} +} + +func (n *none) length() uint32 { + return n.meta.Count +} + +func (*none) unmarshal(*csup.Context, field.Projection) {} + +func (n *none) project(loader *loader, projection field.Projection) vector.Any { + if len(projection) > 0 { + return vector.NewMissing(loader.sctx, n.length()) + } + return vector.NewNone(n.meta.Count) +} diff --git a/runtime/vcache/null.go b/runtime/vcache/null.go new file mode 100644 index 0000000000..b4f05bc79c --- /dev/null +++ b/runtime/vcache/null.go @@ -0,0 +1,28 @@ +package vcache + +import ( + "github.com/brimdata/super/csup" + "github.com/brimdata/super/pkg/field" + "github.com/brimdata/super/vector" +) + +type null struct { + meta *csup.Null +} + +func newNull(meta *csup.Null) *null { + return &null{meta: meta} +} + +func (n *null) length() uint32 { + return n.meta.Count +} + +func (*null) unmarshal(*csup.Context, field.Projection) {} + +func (n *null) project(loader *loader, projection field.Projection) vector.Any { + if len(projection) > 0 { + return vector.NewMissing(loader.sctx, n.length()) + } + return vector.NewNull(n.meta.Count) +} diff --git a/runtime/vcache/primitive.go b/runtime/vcache/primitive.go deleted file mode 100644 index 5d1d1d2472..0000000000 --- a/runtime/vcache/primitive.go +++ /dev/null @@ -1,140 +0,0 @@ -package vcache - -import ( - "fmt" - "net/netip" - "sync" - - "github.com/brimdata/super" - "github.com/brimdata/super/csup" - "github.com/brimdata/super/pkg/field" - "github.com/brimdata/super/scode" - "github.com/brimdata/super/vector" - "github.com/brimdata/super/vector/bitvec" -) - -type primitive struct { - mu sync.Mutex - meta *csup.Primitive - len uint32 - any any -} - -func newPrimitive(cctx *csup.Context, meta *csup.Primitive) *primitive { - return &primitive{meta: meta, len: meta.Len(cctx)} -} - -func (p *primitive) length() uint32 { - return p.len -} - -func (*primitive) unmarshal(*csup.Context, field.Projection) {} - -func (p *primitive) project(loader *loader, projection field.Projection) vector.Any { - if len(projection) > 0 { - return vector.NewMissing(loader.sctx, p.length()) - } - return p.newVector(loader) -} - -func (p *primitive) load(loader *loader) any { - p.mu.Lock() - defer p.mu.Unlock() - if p.any == nil { - p.any = p.loadAnyWithLock(loader) - } - return p.any -} - -func (p *primitive) loadAnyWithLock(loader *loader) any { - bytes := make([]byte, p.meta.Location.MemLength) - if err := p.meta.Location.Read(loader.r, bytes); err != nil { - panic(err) - } - length := p.length() - it := scode.Iter(bytes) - switch p.meta.Typ.(type) { - case *super.TypeOfUint8, *super.TypeOfUint16, *super.TypeOfUint32, *super.TypeOfUint64, *super.TypeEnum: - values := make([]uint64, 0, length) - for range length { - values = append(values, super.DecodeUint(it.Next())) - } - return values - case *super.TypeOfInt8, *super.TypeOfInt16, *super.TypeOfInt32, *super.TypeOfInt64, *super.TypeOfDuration, *super.TypeOfTime: - values := make([]int64, 0, length) - for range length { - values = append(values, super.DecodeInt(it.Next())) - } - return values - case *super.TypeOfFloat16, *super.TypeOfFloat32, *super.TypeOfFloat64: - values := make([]float64, 0, length) - for range length { - values = append(values, super.DecodeFloat(it.Next())) - } - return values - case *super.TypeOfBool: - bits := bitvec.NewFalse(length) - for slot := range length { - if super.DecodeBool(it.Next()) { - bits.Set(slot) - } - } - return bits - case *super.TypeOfBytes, *super.TypeOfString: - var bytes []byte - // First offset is always zero. - offs := make([]uint32, 1, length+1) - for range length { - bytes = append(bytes, it.Next()...) - offs = append(offs, uint32(len(bytes))) - } - return vector.NewBytesTable(offs, bytes) - case *super.TypeOfIP: - values := make([]netip.Addr, 0, length) - for range length { - values = append(values, super.DecodeIP(it.Next())) - } - return values - case *super.TypeOfNet: - values := make([]netip.Prefix, 0, length) - for range length { - values = append(values, super.DecodeNet(it.Next())) - } - return values - case *super.TypeOfNull: - return nil - } - panic(fmt.Errorf("internal error: vcache.loadPrimitive got unknown type %#v", p.meta.Typ)) -} - -func (p *primitive) newVector(loader *loader) vector.Any { - switch typ := p.meta.Typ.(type) { - case *super.TypeOfUint8, *super.TypeOfUint16, *super.TypeOfUint32, *super.TypeOfUint64: - return vector.NewUint(typ, p.load(loader).([]uint64)) - case *super.TypeOfInt8, *super.TypeOfInt16, *super.TypeOfInt32, *super.TypeOfInt64, *super.TypeOfDuration, *super.TypeOfTime: - return vector.NewInt(typ, p.load(loader).([]int64)) - case *super.TypeOfFloat16, *super.TypeOfFloat32, *super.TypeOfFloat64: - return vector.NewFloat(typ, p.load(loader).([]float64)) - case *super.TypeOfBool: - return vector.NewBool(p.load(loader).(bitvec.Bits)) - case *super.TypeOfBytes: - return vector.NewBytes(p.load(loader).(vector.BytesTable)) - case *super.TypeOfString: - return vector.NewString(p.load(loader).(vector.BytesTable)) - case *super.TypeOfIP: - return vector.NewIP(p.load(loader).([]netip.Addr)) - case *super.TypeOfNet: - return vector.NewNet(p.load(loader).([]netip.Prefix)) - case *super.TypeEnum: - // Despite being coded as a primitive, enums have complex types that - // must live in the query context so we can't use the type in the - // CSUP metadata as that context is local to the CSUP object. - t := loader.sctx.LookupTypeEnum(typ.Symbols) - return vector.NewEnum(t, p.load(loader).([]uint64)) - case *super.TypeOfNull: - return vector.NewNull(p.length()) - case *super.TypeOfNone: - return vector.NewNone(p.length()) - } - panic(fmt.Errorf("internal error: vcache.loadPrimitive got unknown type %#v", p.meta.Typ)) -} diff --git a/runtime/vcache/shadow.go b/runtime/vcache/shadow.go index 3758c0cc5a..16ef2c4f96 100644 --- a/runtime/vcache/shadow.go +++ b/runtime/vcache/shadow.go @@ -61,6 +61,8 @@ func newShadow(cctx *csup.Context, id csup.ID) shadow { return newMap(cctx, meta) case *csup.Union: return newUnion(cctx, meta) + case *csup.Enum: + return newEnum(meta, newShadow(cctx, meta.Values)) case *csup.Fusion: return newFusion(cctx, meta) case *csup.Dict: @@ -71,10 +73,18 @@ func newShadow(cctx *csup.Context, id csup.ID) shadow { return newUint(cctx, meta) case *csup.Float: return newFloat(cctx, meta) + case *csup.Bool: + return newBool(meta) case *csup.Bytes: return newBytes(cctx, meta) - case *csup.Primitive: - return newPrimitive(cctx, meta) + case *csup.IP: + return newIP(meta) + case *csup.Net: + return newNet(meta) + case *csup.Null: + return newNull(meta) + case *csup.None: + return newNone(meta) case *csup.TypeValue: return newTypeValue(cctx, meta) case *csup.Const: diff --git a/vector/vbuild/dynamic.go b/vector/vbuild/dynamic.go index 5e9c267d68..4b0a408e51 100644 --- a/vector/vbuild/dynamic.go +++ b/vector/vbuild/dynamic.go @@ -48,15 +48,19 @@ func (d *DynamicBuilder) write(vec vector.Any) uint32 { return i } +func (d *DynamicBuilder) Len() uint32 { + return uint32(len(d.tags)) +} + func (d *DynamicBuilder) Build() vector.Any { - out := d.build() + out := d.BuildDynamic() if len(out.Values) == 1 { return out.Values[0] } return out } -func (d *DynamicBuilder) build() *vector.Dynamic { +func (d *DynamicBuilder) BuildDynamic() *vector.Dynamic { var vecs []vector.Any for _, b := range d.values { vecs = append(vecs, b.Build())