diff --git a/Makefile b/Makefile index 06f2b97..2d077dc 100644 --- a/Makefile +++ b/Makefile @@ -14,21 +14,24 @@ lint: build: go build -v ./... -.PHONY: bench bench-publish bench-consume bench-get bench-multi +.PHONY: bench bench-publish bench-consume bench-get bench-multi bench-keys bench: go test -bench=. -benchmem -run XXX bench-publish: - go test -bench=BenchmarkSingle/Publish -benchmem -run XXX + go test -bench=BenchmarkSingle/Publish/V2 -benchmem -run XXX bench-consume: - go test -bench=BenchmarkSingle/Consume -benchmem -run XXX + go test -bench=BenchmarkSingle/Consume/V2 -benchmem -run XXX bench-get: - go test -bench=BenchmarkSingle/Get -benchmem -run XXX + go test -bench=BenchmarkSingle/Get/V2 -benchmem -run XXX bench-multi: - go test -bench=BenchmarkMulti -benchmem -run XXX + go test -bench=BenchmarkMulti/V2 -benchmem -run XXX + +bench-keys: + go test -bench=BenchmarkKeys -benchmem -run XXX .PHONY: update-libs update-libs: diff --git a/bench_test.go b/bench_test.go index 06fd01b..4c3bb66 100644 --- a/bench_test.go +++ b/bench_test.go @@ -49,6 +49,15 @@ func BenchmarkMulti(b *testing.B) { }) } +func BenchmarkKeys(b *testing.B) { + opts := Options{KeyIndex: true, Version: v2opts} + b.Run("Publish", func(b *testing.B) { benchmarkPublishOptions(b, 8, opts) }) + b.Run("Consume/W", func(b *testing.B) { benchmarkConsumeOptionsW(b, 8, opts) }) + b.Run("Consume/R", func(b *testing.B) { benchmarkConsumeOptionsR(b, 8, opts) }) + b.Run("ByKey/W", func(b *testing.B) { benchmarkGetKeyW(b, v2opts) }) + b.Run("ByKey/A", func(b *testing.B) { benchmarkGetKeyA(b, v2opts) }) +} + func MkdirBench(b *testing.B) string { name := strings.ReplaceAll(b.Name(), "/", "_") @@ -73,30 +82,34 @@ func benchmarkPublish(b *testing.B, version VersionOptions) { for _, bn := range []int{1, 8} { for _, c := range cases { b.Run(fmt.Sprintf("%d/%s", bn, c.name), func(b *testing.B) { - dir := MkdirBench(b) - defer os.RemoveAll(dir) + benchmarkPublishOptions(b, bn, c.opts) + }) + } + } +} - s, err := Open(dir, c.opts) - require.NoError(b, err) - defer s.Close() +func benchmarkPublishOptions(b *testing.B, bn int, opts Options) { + dir := MkdirBench(b) + defer os.RemoveAll(dir) - msgs := message.Gen(b.N) + s, err := Open(dir, opts) + require.NoError(b, err) + defer s.Close() - b.SetBytes(s.Size(msgs[0]) * int64(bn)) - b.ResetTimer() + msgs := message.Gen(b.N) - for i := 0; i < b.N; i += bn { - top := min(i+bn, b.N) + b.SetBytes(s.Size(msgs[0]) * int64(bn)) + b.ResetTimer() - if _, err := s.Publish(msgs[i:top]); err != nil { - b.Fatal(err) - } - } + for i := 0; i < b.N; i += bn { + top := min(i+bn, b.N) - b.StopTimer() - }) + if _, err := s.Publish(msgs[i:top]); err != nil { + b.Fatal(err) } } + + b.StopTimer() } func benchmarkPublishMulti(b *testing.B, version VersionOptions) { @@ -153,84 +166,95 @@ func benchmarkConsume(b *testing.B, version VersionOptions) { for _, bn := range []int{1, 8} { for _, c := range cases { b.Run(fmt.Sprintf("W/%s/%d", c.name, bn), func(b *testing.B) { - dir := MkdirBench(b) - defer os.RemoveAll(dir) + benchmarkConsumeOptionsW(b, bn, c.opts) + }) - l, err := Open(dir, c.opts) - require.NoError(b, err) - defer l.Close() + b.Run(fmt.Sprintf("A/%s/%d", c.name, bn), func(b *testing.B) { + benchmarkConsumeOptionsA(b, bn, c.opts) + }) - msgs := fillLog(b, l) + b.Run(fmt.Sprintf("R/%s/%d", c.name, bn), func(b *testing.B) { + benchmarkConsumeOptionsR(b, bn, c.opts) + }) + } + } +} - b.SetBytes(l.Size(msgs[0]) * int64(bn)) - b.ResetTimer() +func benchmarkConsumeOptionsW(b *testing.B, bn int, opts Options) { + dir := MkdirBench(b) + defer os.RemoveAll(dir) - for i := 0; i < b.N; i += bn { - if _, _, err := l.Consume(int64(i), int64(bn)); err != nil { - b.Fatal(err) - } - } + l, err := Open(dir, opts) + require.NoError(b, err) + defer l.Close() - b.StopTimer() - }) + msgs := fillLog(b, l) - b.Run(fmt.Sprintf("A/%s/%d", c.name, bn), func(b *testing.B) { - dir := MkdirBench(b) - defer os.RemoveAll(dir) + b.SetBytes(l.Size(msgs[0]) * int64(bn)) + b.ResetTimer() + + for i := 0; i < b.N; i += bn { + if _, _, err := l.Consume(int64(i), int64(bn)); err != nil { + b.Fatal(err) + } + } - l, err := Open(dir, c.opts) - require.NoError(b, err) - defer l.Close() + b.StopTimer() +} - msgs := fillLog(b, l) - require.NoError(b, l.Close()) +func benchmarkConsumeOptionsA(b *testing.B, bn int, opts Options) { + dir := MkdirBench(b) + defer os.RemoveAll(dir) - b.SetBytes(l.Size(msgs[0]) * int64(bn)) - b.ResetTimer() + l, err := Open(dir, opts) + require.NoError(b, err) + defer l.Close() - l, err = Open(dir, c.opts) - require.NoError(b, err) - defer l.Close() + msgs := fillLog(b, l) + require.NoError(b, l.Close()) - for i := 0; i < b.N; i += bn { - if _, _, err := l.Consume(int64(i), int64(bn)); err != nil { - b.Fatal(err) - } - } + b.SetBytes(l.Size(msgs[0]) * int64(bn)) + b.ResetTimer() - b.StopTimer() - }) + l, err = Open(dir, opts) + require.NoError(b, err) + defer l.Close() - b.Run(fmt.Sprintf("R/%s/%d", c.name, bn), func(b *testing.B) { - dir := MkdirBench(b) - defer os.RemoveAll(dir) + for i := 0; i < b.N; i += bn { + if _, _, err := l.Consume(int64(i), int64(bn)); err != nil { + b.Fatal(err) + } + } - l, err := Open(dir, c.opts) - require.NoError(b, err) - defer l.Close() + b.StopTimer() +} - msgs := fillLog(b, l) - require.NoError(b, l.Close()) +func benchmarkConsumeOptionsR(b *testing.B, bn int, opts Options) { + dir := MkdirBench(b) + defer os.RemoveAll(dir) - b.SetBytes(l.Size(msgs[0]) * int64(bn)) - b.ResetTimer() + l, err := Open(dir, opts) + require.NoError(b, err) + defer l.Close() - opts := c.opts - opts.Readonly = true - l, err = Open(dir, opts) - require.NoError(b, err) - defer l.Close() + msgs := fillLog(b, l) + require.NoError(b, l.Close()) - for i := 0; i < b.N; i += bn { - if _, _, err := l.Consume(int64(i), int64(bn)); err != nil { - b.Fatal(err) - } - } + b.SetBytes(l.Size(msgs[0]) * int64(bn)) + b.ResetTimer() - b.StopTimer() - }) + opts.Readonly = true + l, err = Open(dir, opts) + require.NoError(b, err) + defer l.Close() + + for i := 0; i < b.N; i += bn { + if _, _, err := l.Consume(int64(i), int64(bn)); err != nil { + b.Fatal(err) } } + + b.StopTimer() } func benchmarkConsumeMulti(b *testing.B, version VersionOptions) { @@ -291,10 +315,18 @@ func benchmarkGet(b *testing.B, version VersionOptions) { }) b.Run("ByKey/W", func(b *testing.B) { + benchmarkGetKeyW(b, version) + }) + + b.Run("ByKey/A", func(b *testing.B) { + benchmarkGetKeyA(b, version) + }) + + b.Run("ByTime/W", func(b *testing.B) { dir := MkdirBench(b) defer os.RemoveAll(dir) - l, err := Open(dir, Options{KeyIndex: true, Version: version}) + l, err := Open(dir, Options{TimeIndex: true, Version: version}) require.NoError(b, err) defer l.Close() @@ -304,7 +336,7 @@ func benchmarkGet(b *testing.B, version VersionOptions) { b.ResetTimer() for i := 0; i < b.N; i++ { - if _, err := l.GetByKey(msgs[i].Key); err != nil { + if _, err := l.GetByTime(msgs[i].Time); err != nil { b.Fatal(err) } } @@ -312,11 +344,11 @@ func benchmarkGet(b *testing.B, version VersionOptions) { b.StopTimer() }) - b.Run("ByKey/A", func(b *testing.B) { + b.Run("ByTime/A", func(b *testing.B) { dir := MkdirBench(b) defer os.RemoveAll(dir) - l, err := Open(dir, Options{KeyIndex: true, Version: version}) + l, err := Open(dir, Options{TimeIndex: true, Version: version}) require.NoError(b, err) defer l.Close() @@ -326,67 +358,67 @@ func benchmarkGet(b *testing.B, version VersionOptions) { b.SetBytes(l.Size(msgs[0])) b.ResetTimer() - l, err = Open(dir, Options{KeyIndex: true, Readonly: true, Version: version}) + l, err = Open(dir, Options{TimeIndex: true, Readonly: true, Version: version}) require.NoError(b, err) defer l.Close() for i := 0; i < b.N; i++ { - if _, err := l.GetByKey(msgs[i].Key); err != nil { + if _, err := l.GetByTime(msgs[i].Time); err != nil { b.Fatal(err) } } b.StopTimer() }) +} - b.Run("ByTime/W", func(b *testing.B) { - dir := MkdirBench(b) - defer os.RemoveAll(dir) +func benchmarkGetKeyW(b *testing.B, version VersionOptions) { + dir := MkdirBench(b) + defer os.RemoveAll(dir) - l, err := Open(dir, Options{TimeIndex: true, Version: version}) - require.NoError(b, err) - defer l.Close() + l, err := Open(dir, Options{KeyIndex: true, Version: version}) + require.NoError(b, err) + defer l.Close() - msgs := fillLog(b, l) + msgs := fillLog(b, l) - b.SetBytes(l.Size(msgs[0])) - b.ResetTimer() + b.SetBytes(l.Size(msgs[0])) + b.ResetTimer() - for i := 0; i < b.N; i++ { - if _, err := l.GetByTime(msgs[i].Time); err != nil { - b.Fatal(err) - } + for i := 0; i < b.N; i++ { + if _, err := l.GetByKey(msgs[i].Key); err != nil { + b.Fatal(err) } + } - b.StopTimer() - }) + b.StopTimer() +} - b.Run("ByTime/A", func(b *testing.B) { - dir := MkdirBench(b) - defer os.RemoveAll(dir) +func benchmarkGetKeyA(b *testing.B, version VersionOptions) { + dir := MkdirBench(b) + defer os.RemoveAll(dir) - l, err := Open(dir, Options{TimeIndex: true, Version: version}) - require.NoError(b, err) - defer l.Close() + l, err := Open(dir, Options{KeyIndex: true, Version: version}) + require.NoError(b, err) + defer l.Close() - msgs := fillLog(b, l) - require.NoError(b, l.Close()) + msgs := fillLog(b, l) + require.NoError(b, l.Close()) - b.SetBytes(l.Size(msgs[0])) - b.ResetTimer() + b.SetBytes(l.Size(msgs[0])) + b.ResetTimer() - l, err = Open(dir, Options{TimeIndex: true, Readonly: true, Version: version}) - require.NoError(b, err) - defer l.Close() + l, err = Open(dir, Options{KeyIndex: true, Readonly: true, Version: version}) + require.NoError(b, err) + defer l.Close() - for i := 0; i < b.N; i++ { - if _, err := l.GetByTime(msgs[i].Time); err != nil { - b.Fatal(err) - } + for i := 0; i < b.N; i++ { + if _, err := l.GetByKey(msgs[i].Key); err != nil { + b.Fatal(err) } + } - b.StopTimer() - }) + b.StopTimer() } func benchmarkGetKeyMulti(b *testing.B, version VersionOptions) { diff --git a/log_reader.go b/log_reader.go index fde53ca..fcd784c 100644 --- a/log_reader.go +++ b/log_reader.go @@ -122,7 +122,7 @@ func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, erro return msgs[len(msgs)-1].Offset + 1, msgs, nil } -func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int64, []message.Message, error) { +func (r *reader) ConsumeByKey(key []byte, keyHash []byte, offset, maxCount int64) (int64, []message.Message, error) { ix, err := r.getIndexNow() if err != nil { return OffsetInvalid, nil, err @@ -204,7 +204,7 @@ func (r *reader) Get(offset int64) (message.Message, error) { return messages.Get(position) } -func (r *reader) GetByKey(key, keyHash []byte, tctx int64) (message.Message, error) { +func (r *reader) GetByKey(key []byte, keyHash []byte, tctx int64) (message.Message, error) { ix, err := r.getIndexAt(tctx) if err != nil { return message.Invalid, err diff --git a/pkg/index/keys.go b/pkg/index/keys.go index f44efc3..f78593d 100644 --- a/pkg/index/keys.go +++ b/pkg/index/keys.go @@ -24,24 +24,28 @@ func KeyHashEncoded(h uint64) []byte { return hash } +type keyPositions struct { + positions []int64 +} + func AppendKeys(keys art.Tree, items []Item) { hash := make([]byte, 8) for _, item := range items { binary.BigEndian.PutUint64(hash, item.KeyHash) - var positions []int64 if v, found := keys.Search(hash); found { - positions = v.([]int64) + kp := v.(*keyPositions) + kp.positions = append(kp.positions, item.Position) + } else { + keys.Insert(hash, &keyPositions{[]int64{item.Position}}) } - positions = append(positions, item.Position) - - keys.Insert(hash, positions) } } func Keys(keys art.Tree, keyHash []byte) ([]int64, error) { if v, found := keys.Search(keyHash); found { - return v.([]int64), nil + kp := v.(*keyPositions) + return kp.positions, nil } return nil, ErrKeyNotFound