From 08adff82b5168438f1ec649390240fb2a08509ab Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 25 Apr 2026 15:05:18 -0400 Subject: [PATCH 1/4] delete returns all deleted messages --- api.go | 5 +++-- compact_deletes.go | 4 ++-- compact_deletes_test.go | 13 +++++++++++-- compact_updates.go | 4 ++-- compact_updates_test.go | 21 +++++++++++++-------- delete.go | 21 ++++++++++----------- delete_test.go | 5 +++-- log.go | 10 +++++----- log_test.go | 41 +++++++++++++++++++++++++---------------- log_writer.go | 6 +++--- pkg/segment/segment.go | 9 +++++---- trim_age.go | 4 ++-- trim_count.go | 4 ++-- trim_offset.go | 4 ++-- trim_size.go | 4 ++-- typed.go | 14 +++++++++++++- 16 files changed, 103 insertions(+), 66 deletions(-) diff --git a/api.go b/api.go index b2e0b4c..92c8872 100644 --- a/api.go +++ b/api.go @@ -145,8 +145,8 @@ type Log interface { // Delete tries to delete a set of messages by their offset // from the log and returns the amount of storage deleted // It does not guarantee that it will delete all messages, - // it returns the set of actually deleted offsets. - Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error) + // it returns list of actually deleted messages. + Delete(offsets map[int64]struct{}) (deletedMessages []Message, deletedSize int64, err error) // Size returns the amount of storage a message occupies in the // NewSegmentsVersion format (see VersionOptions), plus the index overhead. @@ -201,6 +201,7 @@ func Recover(dir string, opts Options) error { }) } +// Migrate rewrites all segments with a concrete options and version func Migrate(dir string, opts Options, version Version) error { if version == vUnknown { return fmt.Errorf("migrate: version must be specified (e.g. klevdb.V2)") diff --git a/compact_deletes.go b/compact_deletes.go index 0cee06b..a7ea9a9 100644 --- a/compact_deletes.go +++ b/compact_deletes.go @@ -67,7 +67,7 @@ SEARCH: // and are therefore no longer relevant/active. // // returns the offsets it deleted and the amount of storage freed -func CompactDeletes(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) { +func CompactDeletes(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) { offsets, err := FindDeletes(ctx, l, before) if err != nil { return nil, 0, err @@ -76,7 +76,7 @@ func CompactDeletes(ctx context.Context, l Log, before time.Time) (map[int64]str } // CompactDeletesMulti is similar to Deletes, but will try to remove messages from multiple segments -func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindDeletes(ctx, l, before) if err != nil { return nil, 0, err diff --git a/compact_deletes_test.go b/compact_deletes_test.go index f88b69b..c185ad9 100644 --- a/compact_deletes_test.go +++ b/compact_deletes_test.go @@ -55,12 +55,21 @@ func TestDeletes(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) + msgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Len(t, off, 5) + require.Len(t, msgs, 5) + off := getOffsets(msgs) for i := range nmsgs { require.Contains(t, off, int64(i)) } require.Equal(t, l.Size(nmsgs[0])*int64(len(nmsgs)), cmp) }) } + +func getOffsets(msgs []Message) map[int64]struct{} { + offsets := make(map[int64]struct{}, len(msgs)) + for _, msg := range msgs { + offsets[msg.Offset] = struct{}{} + } + return offsets +} diff --git a/compact_updates.go b/compact_updates.go index 51c315e..885ff77 100644 --- a/compact_updates.go +++ b/compact_updates.go @@ -58,7 +58,7 @@ SEARCH: // leaving only the current value (last update) for a key. // // returns the offsets it deleted and the amount of storage freed -func CompactUpdates(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) { +func CompactUpdates(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) { offsets, err := FindUpdates(ctx, l, before) if err != nil { return nil, 0, err @@ -67,7 +67,7 @@ func CompactUpdates(ctx context.Context, l Log, before time.Time) (map[int64]str } // UpdatesMulti is similar to Updates, but will try to remove messages from multiple segments -func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindUpdates(ctx, l, before) if err != nil { return nil, 0, err diff --git a/compact_updates_test.go b/compact_updates_test.go index 4b744f9..0b86a6b 100644 --- a/compact_updates_test.go +++ b/compact_updates_test.go @@ -56,9 +56,10 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(dmsgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + off := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, off, 1) + require.Len(t, deletedMsgs, 1) require.Contains(t, off, int64(0)) require.Equal(t, l.Size(msgs[0]), cmp) @@ -80,9 +81,10 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(dmsgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + off := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, off, 1) + require.Len(t, deletedMsgs, 1) require.Contains(t, off, int64(4)) require.Equal(t, l.Size(msgs[0]), cmp) @@ -102,9 +104,10 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + off := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, off, len(msgs)) + require.Len(t, deletedMsgs, len(msgs)) for i := range msgs { require.Contains(t, off, int64(i)) } @@ -130,7 +133,8 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(nmsgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, nmsgs[2].Time) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, nmsgs[2].Time) + off := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, off, 3) for i := range 3 { @@ -155,7 +159,8 @@ func TestUpdates(t *testing.T) { }) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + off := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, off, 1) require.Contains(t, off, int64(1)) diff --git a/delete.go b/delete.go index d2e821f..cdb7975 100644 --- a/delete.go +++ b/delete.go @@ -32,31 +32,30 @@ func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff { // // [DeleteMultiBackoff] is called on each iteration to give // others a chance to work with the log, while being deleted -func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) ([]Message, int64, error) { var remainingOffsets = maps.Clone(offsets) - var deletedOffsets = map[int64]struct{}{} + var deletedMessages []Message var deletedSize int64 for len(remainingOffsets) > 0 { deleted, size, err := l.Delete(remainingOffsets) switch { case err != nil: - return deletedOffsets, deletedSize, err + return deletedMessages, deletedSize, err case len(deleted) == 0: - return deletedOffsets, deletedSize, nil + return deletedMessages, deletedSize, nil } - maps.Copy(deletedOffsets, deleted) + deletedMessages = append(deletedMessages, deleted...) deletedSize += size - maps.DeleteFunc(remainingOffsets, func(k int64, v struct{}) bool { - _, ok := deleted[k] - return ok - }) + for _, msg := range deleted { + delete(remainingOffsets, msg.Offset) + } if err := backoff(ctx); err != nil { - return deletedOffsets, deletedSize, err + return deletedMessages, deletedSize, err } } - return deletedOffsets, deletedSize, nil + return deletedMessages, deletedSize, nil } diff --git a/delete_test.go b/delete_test.go index 9295dd1..b858916 100644 --- a/delete_test.go +++ b/delete_test.go @@ -28,14 +28,15 @@ func TestDeleteMulti(t *testing.T) { require.Equal(t, 10, stats.Messages) require.Equal(t, 5, stats.Segments) - offsets, sz, err := DeleteMulti(context.TODO(), l, map[int64]struct{}{ + msgs, sz, err := DeleteMulti(context.TODO(), l, map[int64]struct{}{ 0: {}, 2: {}, 3: {}, 4: {}, }, DeleteMultiWithWait(time.Millisecond)) require.NoError(t, err) - require.Len(t, offsets, 4) + require.Len(t, msgs, 4) + offsets := getOffsets(msgs) require.Contains(t, offsets, int64(0)) require.Contains(t, offsets, int64(2)) require.Contains(t, offsets, int64(3)) diff --git a/log.go b/log.go index 7c35362..853f0c1 100644 --- a/log.go +++ b/log.go @@ -358,7 +358,7 @@ func (l *log) OffsetByTime(start time.Time) (int64, time.Time, error) { return msg.Offset, msg.Time, nil } -func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, error) { +func (l *log) Delete(offsets map[int64]struct{}) ([]Message, int64, error) { if l.opts.Readonly { return nil, 0, ErrReadonly } @@ -373,7 +373,7 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err return l.delete(offsets) } -func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, error) { +func (l *log) delete(offsets map[int64]struct{}) ([]Message, int64, error) { rdr, err := l.findDeleteReader(offsets) if err != nil { return nil, 0, err @@ -418,7 +418,7 @@ func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err return nil, 0, err } - if len(rs.DeletedOffsets) == 0 { + if len(rs.DeletedMessages) == 0 { // deleted nothing, just remove rewrite files return nil, 0, rs.Remove() } @@ -447,7 +447,7 @@ func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err l.readers = append(l.readers, newWriter.reader) } - return rs.DeletedOffsets, rs.DeletedSize, nil + return rs.DeletedMessages, rs.DeletedSize, nil } l.writerMu.Unlock() @@ -480,7 +480,7 @@ func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err } l.readers = newReaders - return rs.DeletedOffsets, rs.DeletedSize, nil + return rs.DeletedMessages, rs.DeletedSize, nil } func (l *log) findDeleteReader(offsets map[int64]struct{}) (*reader, error) { diff --git a/log_test.go b/log_test.go index 10afbe6..1b0f812 100644 --- a/log_test.go +++ b/log_test.go @@ -1155,11 +1155,12 @@ func testDeleteReaderPartial(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 2, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 0: {}, }) + offsets := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, offsets, 1) + require.Len(t, deletedMsgs, 1) require.Contains(t, offsets, int64(0)) require.Equal(t, l.Size(msgs[0]), sz) @@ -1208,9 +1209,10 @@ func testDeleteReaderPartialReload(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 2, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 0: {}, }) + offsets := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, offsets, 1) require.Contains(t, offsets, int64(0)) @@ -1251,12 +1253,13 @@ func testDeleteReaderFull(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 2, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 0: {}, 1: {}, }) + offsets := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, offsets, 2) + require.Len(t, deletedMsgs, 2) require.Contains(t, offsets, int64(0)) require.Contains(t, offsets, int64(1)) require.Equal(t, l.Size(msgs[0])*2, sz) @@ -1290,12 +1293,13 @@ func testDeleteWriterSingle(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 1, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 0: {}, 1: {}, }) + offsets := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, offsets, 2) + require.Len(t, deletedMsgs, 2) require.Contains(t, offsets, int64(0)) require.Contains(t, offsets, int64(1)) require.Equal(t, l.Size(msgs[0])*2, sz) @@ -1332,9 +1336,10 @@ func testDeleteWriterLast(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(4), nextOffset) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 3: {}, }) + offsets := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, offsets, 1) require.Contains(t, offsets, int64(3)) @@ -1378,11 +1383,12 @@ func testDeleteWriterPartial(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 2, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 2: {}, }) + offsets := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, offsets, 1) + require.Len(t, deletedMsgs, 1) require.Contains(t, offsets, int64(2)) require.Equal(t, l.Size(msgs[0]), sz) @@ -1423,12 +1429,13 @@ func testDeleteWriterFull(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 2, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 2: {}, 3: {}, }) + offsets := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, offsets, 2) + require.Len(t, deletedMsgs, 2) require.Contains(t, offsets, int64(2)) require.Contains(t, offsets, int64(3)) require.Equal(t, l.Size(msgs[0])*2, sz) @@ -1472,23 +1479,25 @@ func testDeleteAll(t *testing.T) { publishBatched(t, l, msgs, 1) // delete the writer segment - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 2: {}, 3: {}, }) + offsets := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, offsets, 2) + require.Len(t, deletedMsgs, 2) require.Contains(t, offsets, int64(2)) require.Contains(t, offsets, int64(3)) require.Equal(t, l.Size(msgs[0])*2, sz) // delete the reader segment - offsets, sz, err = l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err = l.Delete(map[int64]struct{}{ 0: {}, 1: {}, }) + offsets = getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, offsets, 2) + require.Len(t, deletedMsgs, 2) require.Contains(t, offsets, int64(0)) require.Contains(t, offsets, int64(1)) require.Equal(t, l.Size(msgs[0])*2, sz) diff --git a/log_writer.go b/log_writer.go index b279352..0427d15 100644 --- a/log_writer.go +++ b/log_writer.go @@ -112,7 +112,7 @@ func (w *writer) Delete(rs *segment.RewriteSegment) (*writer, *reader, error) { return nil, nil, err } - if len(rs.SurviveOffsets)+len(rs.DeletedOffsets) != w.index.Len() { + if len(rs.SurviveOffsets)+len(rs.DeletedMessages) != w.index.Len() { // the number of messages changed, nothing to drop if err := rs.Remove(); err != nil { return nil, nil, err @@ -155,7 +155,7 @@ func (w *writer) Delete(rs *segment.RewriteSegment) (*writer, *reader, error) { // first move the replacement nextOffset, nextTime := w.index.getNext() - if _, ok := rs.DeletedOffsets[w.index.getLastOffset()]; ok { + if rs.DeletedMessages[len(rs.DeletedMessages)-1].Offset == w.index.getLastOffset() { rdr := openReader(nseg, w.params, w.version, false) wrt, err := openWriter(w.segment.NewAt(nextOffset), w.params, w.version, nextTime) return wrt, rdr, err @@ -170,7 +170,7 @@ func (w *writer) Delete(rs *segment.RewriteSegment) (*writer, *reader, error) { } nextOffset, nextTime := w.index.getNext() - if _, ok := rs.DeletedOffsets[w.index.getLastOffset()]; ok { + if rs.DeletedMessages[len(rs.DeletedMessages)-1].Offset == w.index.getLastOffset() { rdr := openReader(w.segment, w.params, w.version, false) wrt, err := openWriter(w.segment.NewAt(nextOffset), w.params, w.version, nextTime) return wrt, rdr, err diff --git a/pkg/segment/segment.go b/pkg/segment/segment.go index b5e0908..6614362 100644 --- a/pkg/segment/segment.go +++ b/pkg/segment/segment.go @@ -286,6 +286,7 @@ func (s Segment) Migrate(mversion message.Version, iversion index.Version, param defer func() { _ = oldLog.Close() }() if oldLog.Version() == mversion { + // TODO support index only rewrites return nil } @@ -408,8 +409,9 @@ type RewriteSegment struct { Stats Stats SurviveOffsets map[int64]struct{} - DeletedOffsets map[int64]struct{} - DeletedSize int64 + // DeletedOffsets map[int64]struct{} + DeletedMessages []message.Message + DeletedSize int64 } func (r *RewriteSegment) GetNewSegment() Segment { @@ -427,7 +429,6 @@ func (s Segment) forRewrite() (*RewriteSegment, error) { Segment: s.NewAt(s.Offset), SurviveOffsets: map[int64]struct{}{}, - DeletedOffsets: map[int64]struct{}{}, } dst.Log = fmt.Sprintf("%s.rewrite.%s", s.Log, randStr) @@ -469,7 +470,7 @@ func (src Segment) Rewrite(dropOffsets map[int64]struct{}, params index.Params, } if _, ok := dropOffsets[msg.Offset]; ok { - dst.DeletedOffsets[msg.Offset] = struct{}{} + dst.DeletedMessages = append(dst.DeletedMessages, msg) dst.DeletedSize += message.Size(msg, srcVersion) + params.Size() } else { dstPosition, err := dstLog.Write(msg) diff --git a/trim_age.go b/trim_age.go index 075a900..2db8b33 100644 --- a/trim_age.go +++ b/trim_age.go @@ -64,7 +64,7 @@ SEARCH: // TrimByAge tries to remove the messages at the start of the log before given time. // // returns the offsets it deleted and the amount of storage freed -func TrimByAge(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) { +func TrimByAge(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) { offsets, err := FindByAge(ctx, l, before) if err != nil { return nil, 0, err @@ -73,7 +73,7 @@ func TrimByAge(ctx context.Context, l Log, before time.Time) (map[int64]struct{} } // TrimByAgeMulti is similar to ByAge, but will try to remove messages from multiple segments -func TrimByAgeMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +func TrimByAgeMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindByAge(ctx, l, before) if err != nil { return nil, 0, err diff --git a/trim_count.go b/trim_count.go index 807eab4..7297a40 100644 --- a/trim_count.go +++ b/trim_count.go @@ -55,7 +55,7 @@ func FindByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, error // in the log under max count. // // returns the offsets it deleted and the amount of storage freed -func TrimByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, int64, error) { +func TrimByCount(ctx context.Context, l Log, max int) ([]Message, int64, error) { offsets, err := FindByCount(ctx, l, max) if err != nil { return nil, 0, err @@ -64,7 +64,7 @@ func TrimByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, int64 } // TrimByCountMulti is similar to ByCount, but will try to remove messages from multiple segments -func TrimByCountMulti(ctx context.Context, l Log, max int, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +func TrimByCountMulti(ctx context.Context, l Log, max int, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindByCount(ctx, l, max) if err != nil { return nil, 0, err diff --git a/trim_offset.go b/trim_offset.go index 255a792..6393aad 100644 --- a/trim_offset.go +++ b/trim_offset.go @@ -53,7 +53,7 @@ func FindByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{}, // TrimByOffset tries to remove the messages at the start of the log before offset // // returns the offsets it deleted and the amount of storage freed -func TrimByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{}, int64, error) { +func TrimByOffset(ctx context.Context, l Log, before int64) ([]Message, int64, error) { offsets, err := FindByOffset(ctx, l, before) if err != nil { return nil, 0, err @@ -62,7 +62,7 @@ func TrimByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{}, } // TrimByOffsetMulti is similar to ByOffset, but will try to remove messages from multiple segments -func TrimByOffsetMulti(ctx context.Context, l Log, before int64, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +func TrimByOffsetMulti(ctx context.Context, l Log, before int64, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindByOffset(ctx, l, before) if err != nil { return nil, 0, err diff --git a/trim_size.go b/trim_size.go index 24b41f6..e642efa 100644 --- a/trim_size.go +++ b/trim_size.go @@ -53,7 +53,7 @@ func FindBySize(ctx context.Context, l Log, sz int64) (map[int64]struct{}, error // TrimBySize tries to remove messages until log size is less than sz // // returns the offsets it deleted and the amount of storage freed -func TrimBySize(ctx context.Context, l Log, sz int64) (map[int64]struct{}, int64, error) { +func TrimBySize(ctx context.Context, l Log, sz int64) ([]Message, int64, error) { offsets, err := FindBySize(ctx, l, sz) if err != nil { return nil, 0, err @@ -62,7 +62,7 @@ func TrimBySize(ctx context.Context, l Log, sz int64) (map[int64]struct{}, int64 } // TrimBySizeMulti is similar to BySize, but will try to remove messages from multiple segments -func TrimBySizeMulti(ctx context.Context, l Log, sz int64, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +func TrimBySizeMulti(ctx context.Context, l Log, sz int64, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindBySize(ctx, l, sz) if err != nil { return nil, 0, err diff --git a/typed.go b/typed.go index 35a6729..f537295 100644 --- a/typed.go +++ b/typed.go @@ -42,7 +42,7 @@ type TLog[K any, V any] interface { OffsetByTime(start time.Time) (offset int64, messageTime time.Time, err error) // Delete see [Log.Delete] - Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error) + Delete(offsets map[int64]struct{}) (deletedMessages []TMessage[K, V], deletedSize int64, err error) // Size see [Log.Size] Size(m Message) int64 @@ -179,6 +179,18 @@ func (l *tlog[K, V]) GetByTime(start time.Time) (TMessage[K, V], error) { return l.decode(msg) } +func (l *tlog[K, V]) Delete(offsets map[int64]struct{}) ([]TMessage[K, V], int64, error) { + messages, sz, err := l.Log.Delete(offsets) + tmessages := make([]TMessage[K, V], len(messages)) + for i, msg := range messages { + tmessages[i], err = l.decode(msg) + if err != nil { + return nil, sz, err + } + } + return tmessages, sz, nil +} + func (l *tlog[K, V]) Raw() Log { return l.Log } From b5f96f0b8467625a3edaa6d9690d12cbaa6e03e2 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 25 Apr 2026 16:40:19 -0400 Subject: [PATCH 2/4] cleanup and add delete only with offsets --- compact_deletes.go | 11 ++++++++++- compact_deletes_test.go | 25 ++++++++---------------- compact_updates.go | 13 +++++++++++-- compact_updates_test.go | 41 +++++++++++++++++---------------------- delete.go | 29 +++++++++++++++++++++++++++ delete_test.go | 43 ++++++++++++++++++++++++++++++++++++++--- log_test.go | 41 +++++++++++++++------------------------ trim_age.go | 11 ++++++++++- trim_age_test.go | 5 +++-- trim_count.go | 11 ++++++++++- trim_offset.go | 11 ++++++++++- trim_size.go | 11 ++++++++++- 12 files changed, 175 insertions(+), 77 deletions(-) diff --git a/compact_deletes.go b/compact_deletes.go index a7ea9a9..7ee2d82 100644 --- a/compact_deletes.go +++ b/compact_deletes.go @@ -75,7 +75,7 @@ func CompactDeletes(ctx context.Context, l Log, before time.Time) ([]Message, in return l.Delete(offsets) } -// CompactDeletesMulti is similar to Deletes, but will try to remove messages from multiple segments +// CompactDeletesMulti is similar to [CompactDeletes], but will try to remove messages from multiple segments func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindDeletes(ctx, l, before) if err != nil { @@ -83,3 +83,12 @@ func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff D } return DeleteMulti(ctx, l, offsets, backoff) } + +// CompactDeletesMultiOffsets is similar to [CompactDeletesMulti], but only returns the deleted offsets +func CompactDeletesMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + offsets, err := FindDeletes(ctx, l, before) + if err != nil { + return nil, 0, err + } + return DeleteMultiOffsets(ctx, l, offsets, backoff) +} diff --git a/compact_deletes_test.go b/compact_deletes_test.go index c185ad9..e060ad6 100644 --- a/compact_deletes_test.go +++ b/compact_deletes_test.go @@ -19,9 +19,9 @@ func TestDeletes(t *testing.T) { require.NoError(t, err) defer l.Close() - off, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Empty(t, off) + require.Empty(t, deletedMsgs) require.Equal(t, int64(0), cmp) }) @@ -33,9 +33,9 @@ func TestDeletes(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Empty(t, off) + require.Empty(t, deletedMsgs) require.Equal(t, int64(0), cmp) }) @@ -55,21 +55,12 @@ func TestDeletes(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - msgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Len(t, msgs, 5) - off := getOffsets(msgs) - for i := range nmsgs { - require.Contains(t, off, int64(i)) + require.Len(t, deletedMsgs, 5) + for _, nmsg := range nmsgs { + require.Contains(t, deletedMsgs, nmsg) } require.Equal(t, l.Size(nmsgs[0])*int64(len(nmsgs)), cmp) }) } - -func getOffsets(msgs []Message) map[int64]struct{} { - offsets := make(map[int64]struct{}, len(msgs)) - for _, msg := range msgs { - offsets[msg.Offset] = struct{}{} - } - return offsets -} diff --git a/compact_updates.go b/compact_updates.go index 885ff77..95dbd27 100644 --- a/compact_updates.go +++ b/compact_updates.go @@ -51,7 +51,7 @@ SEARCH: return offsets, nil } -// Updates tries to remove messages before given time that are repeated +// CompactUpdates tries to remove messages before given time that are repeated // further in the log leaving only the last message for a given key. // // This is similar to removing the old value updates, @@ -66,7 +66,7 @@ func CompactUpdates(ctx context.Context, l Log, before time.Time) ([]Message, in return l.Delete(offsets) } -// UpdatesMulti is similar to Updates, but will try to remove messages from multiple segments +// CompactUpdatesMulti is similar to [CompactUpdates], but will try to remove messages from multiple segments func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindUpdates(ctx, l, before) if err != nil { @@ -74,3 +74,12 @@ func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff D } return DeleteMulti(ctx, l, offsets, backoff) } + +// CompactUpdatesMultiOffsets is similar to [CompactUpdatesMulti], but only returns the deleted offsets +func CompactUpdatesMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + offsets, err := FindUpdates(ctx, l, before) + if err != nil { + return nil, 0, err + } + return DeleteMultiOffsets(ctx, l, offsets, backoff) +} diff --git a/compact_updates_test.go b/compact_updates_test.go index 0b86a6b..58c5867 100644 --- a/compact_updates_test.go +++ b/compact_updates_test.go @@ -33,9 +33,9 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Empty(t, off) + require.Empty(t, deletedMsgs) require.Equal(t, int64(0), cmp) gmsg, err := l.GetByKey(msgs[0].Key) @@ -57,10 +57,9 @@ func TestUpdates(t *testing.T) { require.NoError(t, err) deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) - off := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, deletedMsgs, 1) - require.Contains(t, off, int64(0)) + require.Contains(t, deletedMsgs, msgs[0]) require.Equal(t, l.Size(msgs[0]), cmp) gmsg, err := l.GetByKey(msgs[0].Key) @@ -82,10 +81,9 @@ func TestUpdates(t *testing.T) { require.NoError(t, err) deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) - off := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, deletedMsgs, 1) - require.Contains(t, off, int64(4)) + require.Contains(t, deletedMsgs, msgs[4]) require.Equal(t, l.Size(msgs[0]), cmp) gmsg, err := l.GetByKey(msgs[4].Key) @@ -101,21 +99,21 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - _, err = l.Publish(msgs) + upmsgs := slices.Clone(msgs) + _, err = l.Publish(upmsgs) require.NoError(t, err) deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) - off := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, deletedMsgs, len(msgs)) - for i := range msgs { - require.Contains(t, off, int64(i)) + for _, msg := range msgs { + require.Contains(t, deletedMsgs, msg) } require.Equal(t, l.Size(msgs[0])*int64(len(msgs)), cmp) gmsg, err := l.GetByKey(msgs[1].Key) require.NoError(t, err) - require.Equal(t, msgs[1], gmsg) + require.Equal(t, upmsgs[1], gmsg) }) t.Run("Time", func(t *testing.T) { @@ -134,11 +132,10 @@ func TestUpdates(t *testing.T) { require.NoError(t, err) deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, nmsgs[2].Time) - off := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, off, 3) + require.Len(t, deletedMsgs, 3) for i := range 3 { - require.Contains(t, off, int64(i)) + require.Contains(t, deletedMsgs, msgs[i]) } require.Equal(t, l.Size(msgs[0])*3, cmp) @@ -152,18 +149,16 @@ func TestUpdates(t *testing.T) { require.NoError(t, err) defer l.Close() - _, err = l.Publish([]Message{ - {Key: []byte("x")}, - {}, - {}, - }) + nmsgs := message.Gen(3) + nmsgs[1].Key = nil + nmsgs[2].Key = nil + _, err = l.Publish(nmsgs) require.NoError(t, err) deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) - off := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, off, 1) - require.Contains(t, off, int64(1)) - require.Equal(t, l.Size(Message{}), cmp) + require.Len(t, deletedMsgs, 1) + require.Contains(t, deletedMsgs, nmsgs[1]) + require.Equal(t, l.Size(nmsgs[1]), cmp) }) } diff --git a/delete.go b/delete.go index cdb7975..bb8febc 100644 --- a/delete.go +++ b/delete.go @@ -59,3 +59,32 @@ func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff return deletedMessages, deletedSize, nil } + +// DeleteMultiOffsets is similar to [DeleteMulti] but will only collect offsets instead of whole messages +func DeleteMultiOffsets(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + var remainingOffsets = maps.Clone(offsets) + var deletedOffsets = map[int64]struct{}{} + var deletedSize int64 + + for len(remainingOffsets) > 0 { + deleted, size, err := l.Delete(remainingOffsets) + switch { + case err != nil: + return deletedOffsets, deletedSize, err + case len(deleted) == 0: + return deletedOffsets, deletedSize, nil + } + + deletedSize += size + for _, msg := range deleted { + deletedOffsets[msg.Offset] = struct{}{} + delete(remainingOffsets, msg.Offset) + } + + if err := backoff(ctx); err != nil { + return deletedOffsets, deletedSize, err + } + } + + return deletedOffsets, deletedSize, nil +} diff --git a/delete_test.go b/delete_test.go index b858916..e7b47a4 100644 --- a/delete_test.go +++ b/delete_test.go @@ -28,15 +28,52 @@ func TestDeleteMulti(t *testing.T) { require.Equal(t, 10, stats.Messages) require.Equal(t, 5, stats.Segments) - msgs, sz, err := DeleteMulti(context.TODO(), l, map[int64]struct{}{ + deletedMsgs, sz, err := DeleteMulti(context.TODO(), l, map[int64]struct{}{ 0: {}, 2: {}, 3: {}, 4: {}, }, DeleteMultiWithWait(time.Millisecond)) require.NoError(t, err) - require.Len(t, msgs, 4) - offsets := getOffsets(msgs) + require.Len(t, deletedMsgs, 4) + require.Contains(t, deletedMsgs, msgs[0]) + require.Contains(t, deletedMsgs, msgs[2]) + require.Contains(t, deletedMsgs, msgs[3]) + require.Contains(t, deletedMsgs, msgs[4]) + require.Equal(t, l.Size(msgs[0])*4, sz) + + stats, err = l.Stat() + require.NoError(t, err) + require.Equal(t, 6, stats.Messages) + require.Equal(t, 4, stats.Segments) +} + +func TestDeleteMultiOffsets(t *testing.T) { + msgs := message.Gen(10) + + l, err := Open(t.TempDir(), Options{ + TimeIndex: true, + KeyIndex: true, + Rollover: 2 * (message.Size(msgs[0], message.V2) - 1), + }) + require.NoError(t, err) + defer l.Close() + + publishBatched(t, l, msgs, 1) + + stats, err := l.Stat() + require.NoError(t, err) + require.Equal(t, 10, stats.Messages) + require.Equal(t, 5, stats.Segments) + + offsets, sz, err := DeleteMultiOffsets(context.TODO(), l, map[int64]struct{}{ + 0: {}, + 2: {}, + 3: {}, + 4: {}, + }, DeleteMultiWithWait(time.Millisecond)) + require.NoError(t, err) + require.Len(t, offsets, 4) require.Contains(t, offsets, int64(0)) require.Contains(t, offsets, int64(2)) require.Contains(t, offsets, int64(3)) diff --git a/log_test.go b/log_test.go index 1b0f812..eb559a0 100644 --- a/log_test.go +++ b/log_test.go @@ -1158,10 +1158,9 @@ func testDeleteReaderPartial(t *testing.T) { deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 0: {}, }) - offsets := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, deletedMsgs, 1) - require.Contains(t, offsets, int64(0)) + require.Contains(t, deletedMsgs, msgs[0]) require.Equal(t, l.Size(msgs[0]), sz) stats, err = l.Stat() @@ -1212,10 +1211,9 @@ func testDeleteReaderPartialReload(t *testing.T) { deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 0: {}, }) - offsets := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, offsets, 1) - require.Contains(t, offsets, int64(0)) + require.Len(t, deletedMsgs, 1) + require.Contains(t, deletedMsgs, msgs[0]) require.Equal(t, l.Size(msgs[0]), sz) stats, err = l.Stat() @@ -1257,11 +1255,10 @@ func testDeleteReaderFull(t *testing.T) { 0: {}, 1: {}, }) - offsets := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, deletedMsgs, 2) - require.Contains(t, offsets, int64(0)) - require.Contains(t, offsets, int64(1)) + require.Contains(t, deletedMsgs, msgs[0]) + require.Contains(t, deletedMsgs, msgs[1]) require.Equal(t, l.Size(msgs[0])*2, sz) stats, err = l.Stat() @@ -1297,11 +1294,10 @@ func testDeleteWriterSingle(t *testing.T) { 0: {}, 1: {}, }) - offsets := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, deletedMsgs, 2) - require.Contains(t, offsets, int64(0)) - require.Contains(t, offsets, int64(1)) + require.Contains(t, deletedMsgs, msgs[0]) + require.Contains(t, deletedMsgs, msgs[1]) require.Equal(t, l.Size(msgs[0])*2, sz) stats, err = l.Stat() @@ -1339,10 +1335,9 @@ func testDeleteWriterLast(t *testing.T) { deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 3: {}, }) - offsets := getOffsets(deletedMsgs) require.NoError(t, err) - require.Len(t, offsets, 1) - require.Contains(t, offsets, int64(3)) + require.Len(t, deletedMsgs, 1) + require.Contains(t, deletedMsgs, msgs[3]) require.Equal(t, l.Size(msgs[0]), sz) stats, err := l.Stat() @@ -1386,10 +1381,9 @@ func testDeleteWriterPartial(t *testing.T) { deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 2: {}, }) - offsets := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, deletedMsgs, 1) - require.Contains(t, offsets, int64(2)) + require.Contains(t, deletedMsgs, msgs[2]) require.Equal(t, l.Size(msgs[0]), sz) stats, err = l.Stat() @@ -1433,11 +1427,10 @@ func testDeleteWriterFull(t *testing.T) { 2: {}, 3: {}, }) - offsets := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, deletedMsgs, 2) - require.Contains(t, offsets, int64(2)) - require.Contains(t, offsets, int64(3)) + require.Contains(t, deletedMsgs, msgs[2]) + require.Contains(t, deletedMsgs, msgs[3]) require.Equal(t, l.Size(msgs[0])*2, sz) stats, err = l.Stat() @@ -1483,11 +1476,10 @@ func testDeleteAll(t *testing.T) { 2: {}, 3: {}, }) - offsets := getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, deletedMsgs, 2) - require.Contains(t, offsets, int64(2)) - require.Contains(t, offsets, int64(3)) + require.Contains(t, deletedMsgs, msgs[2]) + require.Contains(t, deletedMsgs, msgs[3]) require.Equal(t, l.Size(msgs[0])*2, sz) // delete the reader segment @@ -1495,11 +1487,10 @@ func testDeleteAll(t *testing.T) { 0: {}, 1: {}, }) - offsets = getOffsets(deletedMsgs) require.NoError(t, err) require.Len(t, deletedMsgs, 2) - require.Contains(t, offsets, int64(0)) - require.Contains(t, offsets, int64(1)) + require.Contains(t, deletedMsgs, msgs[0]) + require.Contains(t, deletedMsgs, msgs[1]) require.Equal(t, l.Size(msgs[0])*2, sz) stats, err := l.Stat() diff --git a/trim_age.go b/trim_age.go index 2db8b33..8c58b28 100644 --- a/trim_age.go +++ b/trim_age.go @@ -72,7 +72,7 @@ func TrimByAge(ctx context.Context, l Log, before time.Time) ([]Message, int64, return l.Delete(offsets) } -// TrimByAgeMulti is similar to ByAge, but will try to remove messages from multiple segments +// TrimByAgeMulti is similar to [TrimByAge], but will try to remove messages from multiple segments func TrimByAgeMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindByAge(ctx, l, before) if err != nil { @@ -80,3 +80,12 @@ func TrimByAgeMulti(ctx context.Context, l Log, before time.Time, backoff Delete } return DeleteMulti(ctx, l, offsets, backoff) } + +// TrimByAgeMultiOffsets is similar to [TrimByAgeMulti], but only returns the deleted offsets +func TrimByAgeMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + offsets, err := FindByAge(ctx, l, before) + if err != nil { + return nil, 0, err + } + return DeleteMultiOffsets(ctx, l, offsets, backoff) +} diff --git a/trim_age_test.go b/trim_age_test.go index f2e9735..66587c9 100644 --- a/trim_age_test.go +++ b/trim_age_test.go @@ -75,9 +75,10 @@ func testByAgeAll(t *testing.T) { require.NoError(t, err) trimTime := msgs[len(msgs)-1].Time.Add(time.Millisecond) - off, sz, err := TrimByAge(context.TODO(), l, trimTime) + deletedMsgs, sz, err := TrimByAge(context.TODO(), l, trimTime) require.NoError(t, err) - require.Len(t, off, 20) + require.Len(t, deletedMsgs, 20) + require.Equal(t, msgs, deletedMsgs) require.Equal(t, l.Size(msgs[0])*20, sz) coff, cmsgs, err := l.Consume(OffsetOldest, 32) diff --git a/trim_count.go b/trim_count.go index 7297a40..2110b37 100644 --- a/trim_count.go +++ b/trim_count.go @@ -63,7 +63,7 @@ func TrimByCount(ctx context.Context, l Log, max int) ([]Message, int64, error) return l.Delete(offsets) } -// TrimByCountMulti is similar to ByCount, but will try to remove messages from multiple segments +// TrimByCountMulti is similar to [TrimByCount], but will try to remove messages from multiple segments func TrimByCountMulti(ctx context.Context, l Log, max int, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindByCount(ctx, l, max) if err != nil { @@ -71,3 +71,12 @@ func TrimByCountMulti(ctx context.Context, l Log, max int, backoff DeleteMultiBa } return DeleteMulti(ctx, l, offsets, backoff) } + +// TrimByCountMultiOffsets is similar to [TrimByCountMulti], but only returns the deleted offsets +func TrimByCountMultiOffsets(ctx context.Context, l Log, max int, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + offsets, err := FindByCount(ctx, l, max) + if err != nil { + return nil, 0, err + } + return DeleteMultiOffsets(ctx, l, offsets, backoff) +} diff --git a/trim_offset.go b/trim_offset.go index 6393aad..70ed19d 100644 --- a/trim_offset.go +++ b/trim_offset.go @@ -61,7 +61,7 @@ func TrimByOffset(ctx context.Context, l Log, before int64) ([]Message, int64, e return l.Delete(offsets) } -// TrimByOffsetMulti is similar to ByOffset, but will try to remove messages from multiple segments +// TrimByOffsetMulti is similar to [TrimByOffset], but will try to remove messages from multiple segments func TrimByOffsetMulti(ctx context.Context, l Log, before int64, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindByOffset(ctx, l, before) if err != nil { @@ -69,3 +69,12 @@ func TrimByOffsetMulti(ctx context.Context, l Log, before int64, backoff DeleteM } return DeleteMulti(ctx, l, offsets, backoff) } + +// TrimByOffsetMultiOffsets is similar to [TrimByOffsetMulti], but only returns the deleted offsets +func TrimByOffsetMultiOffsets(ctx context.Context, l Log, before int64, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + offsets, err := FindByOffset(ctx, l, before) + if err != nil { + return nil, 0, err + } + return DeleteMultiOffsets(ctx, l, offsets, backoff) +} diff --git a/trim_size.go b/trim_size.go index e642efa..bcc4178 100644 --- a/trim_size.go +++ b/trim_size.go @@ -61,7 +61,7 @@ func TrimBySize(ctx context.Context, l Log, sz int64) ([]Message, int64, error) return l.Delete(offsets) } -// TrimBySizeMulti is similar to BySize, but will try to remove messages from multiple segments +// TrimBySizeMulti is similar to [TrimBySize], but will try to remove messages from multiple segments func TrimBySizeMulti(ctx context.Context, l Log, sz int64, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindBySize(ctx, l, sz) if err != nil { @@ -69,3 +69,12 @@ func TrimBySizeMulti(ctx context.Context, l Log, sz int64, backoff DeleteMultiBa } return DeleteMulti(ctx, l, offsets, backoff) } + +// TrimBySizeMultiOffsets is similar to [TrimBySizeMulti], but only returns the deleted offsets +func TrimBySizeMultiOffsets(ctx context.Context, l Log, sz int64, backoff DeleteMultiBackoff) ([]Message, int64, error) { + offsets, err := FindBySize(ctx, l, sz) + if err != nil { + return nil, 0, err + } + return DeleteMulti(ctx, l, offsets, backoff) +} From 6a3d595b64ba2d1dcf2442591642a5c29bf5cd20 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 25 Apr 2026 16:43:32 -0400 Subject: [PATCH 3/4] linting fixes --- typed.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/typed.go b/typed.go index f537295..2dda564 100644 --- a/typed.go +++ b/typed.go @@ -181,6 +181,10 @@ func (l *tlog[K, V]) GetByTime(start time.Time) (TMessage[K, V], error) { func (l *tlog[K, V]) Delete(offsets map[int64]struct{}) ([]TMessage[K, V], int64, error) { messages, sz, err := l.Log.Delete(offsets) + if err != nil { + return nil, 0, err + } + tmessages := make([]TMessage[K, V], len(messages)) for i, msg := range messages { tmessages[i], err = l.decode(msg) From c8ae4ae000e8712fb1b04d7831953c9f9eb8fe66 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 25 Apr 2026 16:50:30 -0400 Subject: [PATCH 4/4] fix bench makefile --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 2d077dc..be867f1 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ bench-get: go test -bench=BenchmarkSingle/Get/V2 -benchmem -run XXX bench-multi: - go test -bench=BenchmarkMulti/V2 -benchmem -run XXX + go test -bench=BenchmarkMulti//V2 -benchmem -run XXX bench-keys: go test -bench=BenchmarkKeys -benchmem -run XXX