Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 7 additions & 27 deletions csup/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,16 @@ import (
type ArrayEncoder struct {
typ super.Type
values Encoder
offsets *offsetsEncoder
offsets *Uint32Encoder
count uint32
}

var _ Encoder = (*ArrayEncoder)(nil)

func NewArrayEncoder(cctx *Context, typ *super.TypeArray) *ArrayEncoder {
func NewArrayEncoder(cctx *Context, vec *vector.Array) *ArrayEncoder {
return &ArrayEncoder{
typ: typ.Type,
values: NewEncoder(cctx, typ.Type),
offsets: newOffsetsEncoder(),
}
}

func (a *ArrayEncoder) Write(vec vector.Any) {
if vec.Len() == 0 {
return
}
switch vec := vec.(type) {
case *vector.Array:
a.count += vec.Len()
a.values.Write(vec.Values)
a.offsets.write(vec.Offsets)
case *vector.Set:
a.count += vec.Len()
a.values.Write(vec.Values)
a.offsets.write(vec.Offsets)
default:
panic(vec)
values: NewEncoder(cctx, vec.Values),
offsets: NewUint32Encoder(vec.Offsets),
}
}

Expand Down Expand Up @@ -69,12 +50,11 @@ type SetEncoder struct {
ArrayEncoder
}

func NewSetEncoder(cctx *Context, typ *super.TypeSet) *SetEncoder {
func NewSetEncoder(cctx *Context, vec *vector.Set) *SetEncoder {
return &SetEncoder{
ArrayEncoder{
typ: typ.Type,
values: NewEncoder(cctx, typ.Type),
offsets: newOffsetsEncoder(),
values: NewEncoder(cctx, vec.Values),
offsets: &Uint32Encoder{vals: vec.Offsets},
},
}
}
Expand Down
56 changes: 56 additions & 0 deletions csup/bool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package csup

import (
"io"

"github.com/brimdata/super/pkg/byteconv"
"github.com/brimdata/super/vector"
"github.com/brimdata/super/vector/bitvec"
"golang.org/x/sync/errgroup"
)

type BoolEncoder struct {
bits bitvec.Bits

// created on Encode
out []byte
fmt uint8
bytesLen uint64
}

func NewBoolEncoder(vec *vector.Bool) Encoder {
return &BoolEncoder{bits: vec.Bits}
}

func (b *BoolEncoder) Encode(group *errgroup.Group) {
group.Go(func() error {
bytes := byteconv.ReinterpretSlice[byte](b.bits.GetBits())
f, out, err := compressBuffer(bytes)
if err != nil {
return err
}
b.fmt = f
b.out = out
b.bytesLen = uint64(len(bytes))
return nil
})
}

func (b *BoolEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) {
loc := Segment{
Offset: off,
MemLength: uint64(len(b.out)),
Length: b.bytesLen,
CompressionFormat: b.fmt,
}
off += loc.Length
return off, cctx.enter(&Bool{
Location: loc,
Count: b.bits.Len(),
})
}

func (b *BoolEncoder) Emit(w io.Writer) error {
_, err := w.Write(b.out)
return err
}
178 changes: 92 additions & 86 deletions csup/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,130 +12,136 @@ import (

type BytesEncoder struct {
typ super.Type
bytes *BytesTableEncoder
min, max []byte
bytes []byte
offsets *offsetsEncoder

// These values are used for the Encode pass.
bytesFmt uint8
bytesOut []byte
bytesLen uint64
}

func NewBytesEncoder(typ super.Type) *BytesEncoder {
func NewBytesEncoder(typ super.Type, table vector.BytesTable) *BytesEncoder {
return &BytesEncoder{
typ: typ,
offsets: newOffsetsEncoder(),
typ: typ,
bytes: &BytesTableEncoder{table: table},
}
}

func (b *BytesEncoder) Write(vec vector.Any) {
if vec.Len() == 0 {
return
}
switch vec := vec.(type) {
case *vector.Bytes:
b.writeTable(vec.Table())
case *vector.String:
b.writeTable(vec.Table())
default:
panic(vec)
}
func (b *BytesEncoder) Encode(group *errgroup.Group) {
b.bytes.Encode(group)
group.Go(func() error {
table := b.bytes.table
if table.Len() == 0 {
return nil
}
b.min, b.max = table.Bytes(0), table.Bytes(0)
for i := range table.Len() {
v := table.Bytes(i)
if bytes.Compare(v, b.min) < 0 {
b.min = v
}
if bytes.Compare(v, b.max) > 0 {
b.max = v
}
}
return nil
})
}

func (b *BytesEncoder) writeTable(table vector.BytesTable) {
if len(b.bytes) == 0 {
val := table.Bytes(0)
b.min = append(b.min[:0], val...)
b.max = append(b.max[:0], val...)
}
for slot := range table.Len() {
val := table.Bytes(slot)
if bytes.Compare(val, b.min) < 0 {
b.min = append(b.min[:0], val...)
func (b *BytesEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) {
off, bytesLoc, offsLoc := b.bytes.Segment(off)
return off, cctx.enter(&Bytes{
Typ: b.typ,
Bytes: bytesLoc,
Offsets: offsLoc,
Min: b.min,
Max: b.max,
Count: b.bytes.table.Len(),
})
}

func (b *BytesEncoder) Emit(w io.Writer) error {
return b.bytes.Emit(w)
}

func maybeStringBytesDict(typ super.Type, table vector.BytesTable) vector.Any {
flat := func(table vector.BytesTable) vector.Any {
if typ.ID() == super.IDString {
return vector.NewString(table)
}
if bytes.Compare(val, b.max) > 0 {
b.max = append(b.max[:0], val...)
return vector.NewBytes(table)
}
var counts []uint32
m := make(map[string]byte)
index := make([]byte, table.Len())
out := vector.NewBytesTableEmpty(0)
for i := range table.Len() {
tag, ok := m[string(table.Bytes(i))]
if !ok {
if len(counts) > math.MaxUint8 {
return flat(table)
}
tag = byte(len(counts))
b := table.Bytes(i)
m[string(b)] = tag
counts = append(counts, 0)
out.Append(b)
}
index[i] = tag
counts[tag]++
}
b.bytes = append(b.bytes, table.RawBytes()...)
b.offsets.write(table.RawOffsets())
if !isValidDict(int(table.Len()), int(out.Len())) {
return flat(table)
}
vec := flat(out)
if vec.Len() == 1 {
return vector.NewConst(vec, table.Len())
}
return vector.NewDict(vec, index, counts)
}

func (b *BytesEncoder) Encode(group *errgroup.Group) {
type BytesTableEncoder struct {
table vector.BytesTable

// These values are used for the Encode pass.
bytesFmt uint8
bytesOut []byte
bytesLen uint64
offsets *Uint32Encoder
}

func NewBytesTableEncoder(table vector.BytesTable) *BytesTableEncoder {
return &BytesTableEncoder{table: table}
}

func (b *BytesTableEncoder) Encode(group *errgroup.Group) {
group.Go(func() error {
fmt, out, err := compressBuffer(b.bytes)
bytes := b.table.RawBytes()
fmt, out, err := compressBuffer(bytes)
if err != nil {
return err
}
b.bytesFmt = fmt
b.bytesOut = out
b.bytesLen = uint64(len(b.bytes))
b.bytes = nil // send to GC
b.bytesLen = uint64(len(bytes))
return nil
})
b.offsets = NewUint32Encoder(b.table.RawOffsets())
b.offsets.Encode(group)
}

func (b *BytesEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) {
func (b *BytesTableEncoder) Segment(off uint64) (uint64, Segment, Segment) {
bytesLoc := Segment{
Offset: off,
Length: uint64(len(b.bytesOut)),
MemLength: b.bytesLen,
CompressionFormat: b.bytesFmt,
}
off, offsLoc := b.offsets.Segment(off + bytesLoc.Length)
return off, cctx.enter(&Bytes{
Typ: b.typ,
Bytes: bytesLoc,
Offsets: offsLoc,
Min: b.min,
Max: b.max,
Count: uint32(len(b.offsets.vals) - 1),
})
return off, bytesLoc, offsLoc
}

func (b *BytesEncoder) Emit(w io.Writer) error {
func (b *BytesTableEncoder) Emit(w io.Writer) error {
if len(b.bytesOut) > 0 {
if _, err := w.Write(b.bytesOut); err != nil {
return err
}
}
return b.offsets.Emit(w)
}

func (b *BytesEncoder) value(slot uint32) []byte {
return b.bytes[b.offsets.vals[slot]:b.offsets.vals[slot+1]]
}

func (b *BytesEncoder) Dict() (PrimitiveEncoder, []byte, []uint32) {
if len(b.bytes) == 0 {
return nil, nil, nil
}
m := make(map[string]byte)
var counts []uint32
index := make([]byte, len(b.offsets.vals)-1)
table := vector.NewBytesTableEmpty(256)
for k := range uint32(len(index)) {
tag, ok := m[string(b.value(k))]
if !ok {
tag = byte(len(counts))
v := b.value(k)
m[string(v)] = tag
table.Append(v)
counts = append(counts, 0)
if len(counts) > math.MaxUint8 {
return nil, nil, nil
}
}
index[k] = tag
counts[tag]++
}
encoder := NewBytesEncoder(b.typ)
encoder.Write(vector.NewBytes(table))
return encoder, index, counts
}

func (b *BytesEncoder) ConstValue() super.Value {
return super.NewValue(b.typ, b.value(0))
}
21 changes: 21 additions & 0 deletions csup/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package csup

import (
"io"

"github.com/brimdata/super"
"golang.org/x/sync/errgroup"
)

type ConstEncoder struct {
val super.Value
len uint32
}

func (*ConstEncoder) Encode(group *errgroup.Group) {}

func (c *ConstEncoder) Metadata(cctx *Context, off uint64) (uint64, ID) {
return off, cctx.enter(&Const{c.val, c.len})
}

func (*ConstEncoder) Emit(w io.Writer) error { return nil }
Loading
Loading