diff --git a/Makefile b/Makefile index 2d077dc..be867f1 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ bench-get: go test -bench=BenchmarkSingle/Get/V2 -benchmem -run XXX bench-multi: - go test -bench=BenchmarkMulti/V2 -benchmem -run XXX + go test -bench=BenchmarkMulti//V2 -benchmem -run XXX bench-keys: go test -bench=BenchmarkKeys -benchmem -run XXX diff --git a/api.go b/api.go index b2e0b4c..92c8872 100644 --- a/api.go +++ b/api.go @@ -145,8 +145,8 @@ type Log interface { // Delete tries to delete a set of messages by their offset // from the log and returns the amount of storage deleted // It does not guarantee that it will delete all messages, - // it returns the set of actually deleted offsets. - Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error) + // it returns list of actually deleted messages. + Delete(offsets map[int64]struct{}) (deletedMessages []Message, deletedSize int64, err error) // Size returns the amount of storage a message occupies in the // NewSegmentsVersion format (see VersionOptions), plus the index overhead. @@ -201,6 +201,7 @@ func Recover(dir string, opts Options) error { }) } +// Migrate rewrites all segments with a concrete options and version func Migrate(dir string, opts Options, version Version) error { if version == vUnknown { return fmt.Errorf("migrate: version must be specified (e.g. klevdb.V2)") diff --git a/compact_deletes.go b/compact_deletes.go index 0cee06b..7ee2d82 100644 --- a/compact_deletes.go +++ b/compact_deletes.go @@ -67,7 +67,7 @@ SEARCH: // and are therefore no longer relevant/active. // // returns the offsets it deleted and the amount of storage freed -func CompactDeletes(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) { +func CompactDeletes(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) { offsets, err := FindDeletes(ctx, l, before) if err != nil { return nil, 0, err @@ -75,11 +75,20 @@ func CompactDeletes(ctx context.Context, l Log, before time.Time) (map[int64]str return l.Delete(offsets) } -// CompactDeletesMulti is similar to Deletes, but will try to remove messages from multiple segments -func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// CompactDeletesMulti is similar to [CompactDeletes], but will try to remove messages from multiple segments +func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindDeletes(ctx, l, before) if err != nil { return nil, 0, err } return DeleteMulti(ctx, l, offsets, backoff) } + +// CompactDeletesMultiOffsets is similar to [CompactDeletesMulti], but only returns the deleted offsets +func CompactDeletesMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + offsets, err := FindDeletes(ctx, l, before) + if err != nil { + return nil, 0, err + } + return DeleteMultiOffsets(ctx, l, offsets, backoff) +} diff --git a/compact_deletes_test.go b/compact_deletes_test.go index f88b69b..e060ad6 100644 --- a/compact_deletes_test.go +++ b/compact_deletes_test.go @@ -19,9 +19,9 @@ func TestDeletes(t *testing.T) { require.NoError(t, err) defer l.Close() - off, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Empty(t, off) + require.Empty(t, deletedMsgs) require.Equal(t, int64(0), cmp) }) @@ -33,9 +33,9 @@ func TestDeletes(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Empty(t, off) + require.Empty(t, deletedMsgs) require.Equal(t, int64(0), cmp) }) @@ -55,11 +55,11 @@ func TestDeletes(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Len(t, off, 5) - for i := range nmsgs { - require.Contains(t, off, int64(i)) + require.Len(t, deletedMsgs, 5) + for _, nmsg := range nmsgs { + require.Contains(t, deletedMsgs, nmsg) } require.Equal(t, l.Size(nmsgs[0])*int64(len(nmsgs)), cmp) }) diff --git a/compact_updates.go b/compact_updates.go index 51c315e..95dbd27 100644 --- a/compact_updates.go +++ b/compact_updates.go @@ -51,14 +51,14 @@ SEARCH: return offsets, nil } -// Updates tries to remove messages before given time that are repeated +// CompactUpdates tries to remove messages before given time that are repeated // further in the log leaving only the last message for a given key. // // This is similar to removing the old value updates, // leaving only the current value (last update) for a key. // // returns the offsets it deleted and the amount of storage freed -func CompactUpdates(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) { +func CompactUpdates(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) { offsets, err := FindUpdates(ctx, l, before) if err != nil { return nil, 0, err @@ -66,11 +66,20 @@ func CompactUpdates(ctx context.Context, l Log, before time.Time) (map[int64]str return l.Delete(offsets) } -// UpdatesMulti is similar to Updates, but will try to remove messages from multiple segments -func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// CompactUpdatesMulti is similar to [CompactUpdates], but will try to remove messages from multiple segments +func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindUpdates(ctx, l, before) if err != nil { return nil, 0, err } return DeleteMulti(ctx, l, offsets, backoff) } + +// CompactUpdatesMultiOffsets is similar to [CompactUpdatesMulti], but only returns the deleted offsets +func CompactUpdatesMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + offsets, err := FindUpdates(ctx, l, before) + if err != nil { + return nil, 0, err + } + return DeleteMultiOffsets(ctx, l, offsets, backoff) +} diff --git a/compact_updates_test.go b/compact_updates_test.go index 4b744f9..58c5867 100644 --- a/compact_updates_test.go +++ b/compact_updates_test.go @@ -33,9 +33,9 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Empty(t, off) + require.Empty(t, deletedMsgs) require.Equal(t, int64(0), cmp) gmsg, err := l.GetByKey(msgs[0].Key) @@ -56,10 +56,10 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(dmsgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Len(t, off, 1) - require.Contains(t, off, int64(0)) + require.Len(t, deletedMsgs, 1) + require.Contains(t, deletedMsgs, msgs[0]) require.Equal(t, l.Size(msgs[0]), cmp) gmsg, err := l.GetByKey(msgs[0].Key) @@ -80,10 +80,10 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(dmsgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Len(t, off, 1) - require.Contains(t, off, int64(4)) + require.Len(t, deletedMsgs, 1) + require.Contains(t, deletedMsgs, msgs[4]) require.Equal(t, l.Size(msgs[0]), cmp) gmsg, err := l.GetByKey(msgs[4].Key) @@ -99,20 +99,21 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - _, err = l.Publish(msgs) + upmsgs := slices.Clone(msgs) + _, err = l.Publish(upmsgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Len(t, off, len(msgs)) - for i := range msgs { - require.Contains(t, off, int64(i)) + require.Len(t, deletedMsgs, len(msgs)) + for _, msg := range msgs { + require.Contains(t, deletedMsgs, msg) } require.Equal(t, l.Size(msgs[0])*int64(len(msgs)), cmp) gmsg, err := l.GetByKey(msgs[1].Key) require.NoError(t, err) - require.Equal(t, msgs[1], gmsg) + require.Equal(t, upmsgs[1], gmsg) }) t.Run("Time", func(t *testing.T) { @@ -130,11 +131,11 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(nmsgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, nmsgs[2].Time) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, nmsgs[2].Time) require.NoError(t, err) - require.Len(t, off, 3) + require.Len(t, deletedMsgs, 3) for i := range 3 { - require.Contains(t, off, int64(i)) + require.Contains(t, deletedMsgs, msgs[i]) } require.Equal(t, l.Size(msgs[0])*3, cmp) @@ -148,17 +149,16 @@ func TestUpdates(t *testing.T) { require.NoError(t, err) defer l.Close() - _, err = l.Publish([]Message{ - {Key: []byte("x")}, - {}, - {}, - }) + nmsgs := message.Gen(3) + nmsgs[1].Key = nil + nmsgs[2].Key = nil + _, err = l.Publish(nmsgs) require.NoError(t, err) - off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) + deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) - require.Len(t, off, 1) - require.Contains(t, off, int64(1)) - require.Equal(t, l.Size(Message{}), cmp) + require.Len(t, deletedMsgs, 1) + require.Contains(t, deletedMsgs, nmsgs[1]) + require.Equal(t, l.Size(nmsgs[1]), cmp) }) } diff --git a/delete.go b/delete.go index d2e821f..bb8febc 100644 --- a/delete.go +++ b/delete.go @@ -32,7 +32,36 @@ func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff { // // [DeleteMultiBackoff] is called on each iteration to give // others a chance to work with the log, while being deleted -func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) ([]Message, int64, error) { + var remainingOffsets = maps.Clone(offsets) + var deletedMessages []Message + var deletedSize int64 + + for len(remainingOffsets) > 0 { + deleted, size, err := l.Delete(remainingOffsets) + switch { + case err != nil: + return deletedMessages, deletedSize, err + case len(deleted) == 0: + return deletedMessages, deletedSize, nil + } + + deletedMessages = append(deletedMessages, deleted...) + deletedSize += size + for _, msg := range deleted { + delete(remainingOffsets, msg.Offset) + } + + if err := backoff(ctx); err != nil { + return deletedMessages, deletedSize, err + } + } + + return deletedMessages, deletedSize, nil +} + +// DeleteMultiOffsets is similar to [DeleteMulti] but will only collect offsets instead of whole messages +func DeleteMultiOffsets(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { var remainingOffsets = maps.Clone(offsets) var deletedOffsets = map[int64]struct{}{} var deletedSize int64 @@ -46,12 +75,11 @@ func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff return deletedOffsets, deletedSize, nil } - maps.Copy(deletedOffsets, deleted) deletedSize += size - maps.DeleteFunc(remainingOffsets, func(k int64, v struct{}) bool { - _, ok := deleted[k] - return ok - }) + for _, msg := range deleted { + deletedOffsets[msg.Offset] = struct{}{} + delete(remainingOffsets, msg.Offset) + } if err := backoff(ctx); err != nil { return deletedOffsets, deletedSize, err diff --git a/delete_test.go b/delete_test.go index 9295dd1..e7b47a4 100644 --- a/delete_test.go +++ b/delete_test.go @@ -28,7 +28,45 @@ func TestDeleteMulti(t *testing.T) { require.Equal(t, 10, stats.Messages) require.Equal(t, 5, stats.Segments) - offsets, sz, err := DeleteMulti(context.TODO(), l, map[int64]struct{}{ + deletedMsgs, sz, err := DeleteMulti(context.TODO(), l, map[int64]struct{}{ + 0: {}, + 2: {}, + 3: {}, + 4: {}, + }, DeleteMultiWithWait(time.Millisecond)) + require.NoError(t, err) + require.Len(t, deletedMsgs, 4) + require.Contains(t, deletedMsgs, msgs[0]) + require.Contains(t, deletedMsgs, msgs[2]) + require.Contains(t, deletedMsgs, msgs[3]) + require.Contains(t, deletedMsgs, msgs[4]) + require.Equal(t, l.Size(msgs[0])*4, sz) + + stats, err = l.Stat() + require.NoError(t, err) + require.Equal(t, 6, stats.Messages) + require.Equal(t, 4, stats.Segments) +} + +func TestDeleteMultiOffsets(t *testing.T) { + msgs := message.Gen(10) + + l, err := Open(t.TempDir(), Options{ + TimeIndex: true, + KeyIndex: true, + Rollover: 2 * (message.Size(msgs[0], message.V2) - 1), + }) + require.NoError(t, err) + defer l.Close() + + publishBatched(t, l, msgs, 1) + + stats, err := l.Stat() + require.NoError(t, err) + require.Equal(t, 10, stats.Messages) + require.Equal(t, 5, stats.Segments) + + offsets, sz, err := DeleteMultiOffsets(context.TODO(), l, map[int64]struct{}{ 0: {}, 2: {}, 3: {}, diff --git a/log.go b/log.go index 7c35362..853f0c1 100644 --- a/log.go +++ b/log.go @@ -358,7 +358,7 @@ func (l *log) OffsetByTime(start time.Time) (int64, time.Time, error) { return msg.Offset, msg.Time, nil } -func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, error) { +func (l *log) Delete(offsets map[int64]struct{}) ([]Message, int64, error) { if l.opts.Readonly { return nil, 0, ErrReadonly } @@ -373,7 +373,7 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err return l.delete(offsets) } -func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, error) { +func (l *log) delete(offsets map[int64]struct{}) ([]Message, int64, error) { rdr, err := l.findDeleteReader(offsets) if err != nil { return nil, 0, err @@ -418,7 +418,7 @@ func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err return nil, 0, err } - if len(rs.DeletedOffsets) == 0 { + if len(rs.DeletedMessages) == 0 { // deleted nothing, just remove rewrite files return nil, 0, rs.Remove() } @@ -447,7 +447,7 @@ func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err l.readers = append(l.readers, newWriter.reader) } - return rs.DeletedOffsets, rs.DeletedSize, nil + return rs.DeletedMessages, rs.DeletedSize, nil } l.writerMu.Unlock() @@ -480,7 +480,7 @@ func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err } l.readers = newReaders - return rs.DeletedOffsets, rs.DeletedSize, nil + return rs.DeletedMessages, rs.DeletedSize, nil } func (l *log) findDeleteReader(offsets map[int64]struct{}) (*reader, error) { diff --git a/log_test.go b/log_test.go index 10afbe6..eb559a0 100644 --- a/log_test.go +++ b/log_test.go @@ -1155,12 +1155,12 @@ func testDeleteReaderPartial(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 2, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 0: {}, }) require.NoError(t, err) - require.Len(t, offsets, 1) - require.Contains(t, offsets, int64(0)) + require.Len(t, deletedMsgs, 1) + require.Contains(t, deletedMsgs, msgs[0]) require.Equal(t, l.Size(msgs[0]), sz) stats, err = l.Stat() @@ -1208,12 +1208,12 @@ func testDeleteReaderPartialReload(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 2, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 0: {}, }) require.NoError(t, err) - require.Len(t, offsets, 1) - require.Contains(t, offsets, int64(0)) + require.Len(t, deletedMsgs, 1) + require.Contains(t, deletedMsgs, msgs[0]) require.Equal(t, l.Size(msgs[0]), sz) stats, err = l.Stat() @@ -1251,14 +1251,14 @@ func testDeleteReaderFull(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 2, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 0: {}, 1: {}, }) require.NoError(t, err) - require.Len(t, offsets, 2) - require.Contains(t, offsets, int64(0)) - require.Contains(t, offsets, int64(1)) + require.Len(t, deletedMsgs, 2) + require.Contains(t, deletedMsgs, msgs[0]) + require.Contains(t, deletedMsgs, msgs[1]) require.Equal(t, l.Size(msgs[0])*2, sz) stats, err = l.Stat() @@ -1290,14 +1290,14 @@ func testDeleteWriterSingle(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 1, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 0: {}, 1: {}, }) require.NoError(t, err) - require.Len(t, offsets, 2) - require.Contains(t, offsets, int64(0)) - require.Contains(t, offsets, int64(1)) + require.Len(t, deletedMsgs, 2) + require.Contains(t, deletedMsgs, msgs[0]) + require.Contains(t, deletedMsgs, msgs[1]) require.Equal(t, l.Size(msgs[0])*2, sz) stats, err = l.Stat() @@ -1332,12 +1332,12 @@ func testDeleteWriterLast(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(4), nextOffset) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 3: {}, }) require.NoError(t, err) - require.Len(t, offsets, 1) - require.Contains(t, offsets, int64(3)) + require.Len(t, deletedMsgs, 1) + require.Contains(t, deletedMsgs, msgs[3]) require.Equal(t, l.Size(msgs[0]), sz) stats, err := l.Stat() @@ -1378,12 +1378,12 @@ func testDeleteWriterPartial(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 2, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 2: {}, }) require.NoError(t, err) - require.Len(t, offsets, 1) - require.Contains(t, offsets, int64(2)) + require.Len(t, deletedMsgs, 1) + require.Contains(t, deletedMsgs, msgs[2]) require.Equal(t, l.Size(msgs[0]), sz) stats, err = l.Stat() @@ -1423,14 +1423,14 @@ func testDeleteWriterFull(t *testing.T) { require.Equal(t, 4, stats.Messages) require.Equal(t, 2, stats.Segments) - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 2: {}, 3: {}, }) require.NoError(t, err) - require.Len(t, offsets, 2) - require.Contains(t, offsets, int64(2)) - require.Contains(t, offsets, int64(3)) + require.Len(t, deletedMsgs, 2) + require.Contains(t, deletedMsgs, msgs[2]) + require.Contains(t, deletedMsgs, msgs[3]) require.Equal(t, l.Size(msgs[0])*2, sz) stats, err = l.Stat() @@ -1472,25 +1472,25 @@ func testDeleteAll(t *testing.T) { publishBatched(t, l, msgs, 1) // delete the writer segment - offsets, sz, err := l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err := l.Delete(map[int64]struct{}{ 2: {}, 3: {}, }) require.NoError(t, err) - require.Len(t, offsets, 2) - require.Contains(t, offsets, int64(2)) - require.Contains(t, offsets, int64(3)) + require.Len(t, deletedMsgs, 2) + require.Contains(t, deletedMsgs, msgs[2]) + require.Contains(t, deletedMsgs, msgs[3]) require.Equal(t, l.Size(msgs[0])*2, sz) // delete the reader segment - offsets, sz, err = l.Delete(map[int64]struct{}{ + deletedMsgs, sz, err = l.Delete(map[int64]struct{}{ 0: {}, 1: {}, }) require.NoError(t, err) - require.Len(t, offsets, 2) - require.Contains(t, offsets, int64(0)) - require.Contains(t, offsets, int64(1)) + require.Len(t, deletedMsgs, 2) + require.Contains(t, deletedMsgs, msgs[0]) + require.Contains(t, deletedMsgs, msgs[1]) require.Equal(t, l.Size(msgs[0])*2, sz) stats, err := l.Stat() diff --git a/log_writer.go b/log_writer.go index b279352..0427d15 100644 --- a/log_writer.go +++ b/log_writer.go @@ -112,7 +112,7 @@ func (w *writer) Delete(rs *segment.RewriteSegment) (*writer, *reader, error) { return nil, nil, err } - if len(rs.SurviveOffsets)+len(rs.DeletedOffsets) != w.index.Len() { + if len(rs.SurviveOffsets)+len(rs.DeletedMessages) != w.index.Len() { // the number of messages changed, nothing to drop if err := rs.Remove(); err != nil { return nil, nil, err @@ -155,7 +155,7 @@ func (w *writer) Delete(rs *segment.RewriteSegment) (*writer, *reader, error) { // first move the replacement nextOffset, nextTime := w.index.getNext() - if _, ok := rs.DeletedOffsets[w.index.getLastOffset()]; ok { + if rs.DeletedMessages[len(rs.DeletedMessages)-1].Offset == w.index.getLastOffset() { rdr := openReader(nseg, w.params, w.version, false) wrt, err := openWriter(w.segment.NewAt(nextOffset), w.params, w.version, nextTime) return wrt, rdr, err @@ -170,7 +170,7 @@ func (w *writer) Delete(rs *segment.RewriteSegment) (*writer, *reader, error) { } nextOffset, nextTime := w.index.getNext() - if _, ok := rs.DeletedOffsets[w.index.getLastOffset()]; ok { + if rs.DeletedMessages[len(rs.DeletedMessages)-1].Offset == w.index.getLastOffset() { rdr := openReader(w.segment, w.params, w.version, false) wrt, err := openWriter(w.segment.NewAt(nextOffset), w.params, w.version, nextTime) return wrt, rdr, err diff --git a/pkg/segment/segment.go b/pkg/segment/segment.go index b5e0908..6614362 100644 --- a/pkg/segment/segment.go +++ b/pkg/segment/segment.go @@ -286,6 +286,7 @@ func (s Segment) Migrate(mversion message.Version, iversion index.Version, param defer func() { _ = oldLog.Close() }() if oldLog.Version() == mversion { + // TODO support index only rewrites return nil } @@ -408,8 +409,9 @@ type RewriteSegment struct { Stats Stats SurviveOffsets map[int64]struct{} - DeletedOffsets map[int64]struct{} - DeletedSize int64 + // DeletedOffsets map[int64]struct{} + DeletedMessages []message.Message + DeletedSize int64 } func (r *RewriteSegment) GetNewSegment() Segment { @@ -427,7 +429,6 @@ func (s Segment) forRewrite() (*RewriteSegment, error) { Segment: s.NewAt(s.Offset), SurviveOffsets: map[int64]struct{}{}, - DeletedOffsets: map[int64]struct{}{}, } dst.Log = fmt.Sprintf("%s.rewrite.%s", s.Log, randStr) @@ -469,7 +470,7 @@ func (src Segment) Rewrite(dropOffsets map[int64]struct{}, params index.Params, } if _, ok := dropOffsets[msg.Offset]; ok { - dst.DeletedOffsets[msg.Offset] = struct{}{} + dst.DeletedMessages = append(dst.DeletedMessages, msg) dst.DeletedSize += message.Size(msg, srcVersion) + params.Size() } else { dstPosition, err := dstLog.Write(msg) diff --git a/trim_age.go b/trim_age.go index 075a900..8c58b28 100644 --- a/trim_age.go +++ b/trim_age.go @@ -64,7 +64,7 @@ SEARCH: // TrimByAge tries to remove the messages at the start of the log before given time. // // returns the offsets it deleted and the amount of storage freed -func TrimByAge(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) { +func TrimByAge(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) { offsets, err := FindByAge(ctx, l, before) if err != nil { return nil, 0, err @@ -72,11 +72,20 @@ func TrimByAge(ctx context.Context, l Log, before time.Time) (map[int64]struct{} return l.Delete(offsets) } -// TrimByAgeMulti is similar to ByAge, but will try to remove messages from multiple segments -func TrimByAgeMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// TrimByAgeMulti is similar to [TrimByAge], but will try to remove messages from multiple segments +func TrimByAgeMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindByAge(ctx, l, before) if err != nil { return nil, 0, err } return DeleteMulti(ctx, l, offsets, backoff) } + +// TrimByAgeMultiOffsets is similar to [TrimByAgeMulti], but only returns the deleted offsets +func TrimByAgeMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + offsets, err := FindByAge(ctx, l, before) + if err != nil { + return nil, 0, err + } + return DeleteMultiOffsets(ctx, l, offsets, backoff) +} diff --git a/trim_age_test.go b/trim_age_test.go index f2e9735..66587c9 100644 --- a/trim_age_test.go +++ b/trim_age_test.go @@ -75,9 +75,10 @@ func testByAgeAll(t *testing.T) { require.NoError(t, err) trimTime := msgs[len(msgs)-1].Time.Add(time.Millisecond) - off, sz, err := TrimByAge(context.TODO(), l, trimTime) + deletedMsgs, sz, err := TrimByAge(context.TODO(), l, trimTime) require.NoError(t, err) - require.Len(t, off, 20) + require.Len(t, deletedMsgs, 20) + require.Equal(t, msgs, deletedMsgs) require.Equal(t, l.Size(msgs[0])*20, sz) coff, cmsgs, err := l.Consume(OffsetOldest, 32) diff --git a/trim_count.go b/trim_count.go index 807eab4..2110b37 100644 --- a/trim_count.go +++ b/trim_count.go @@ -55,7 +55,7 @@ func FindByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, error // in the log under max count. // // returns the offsets it deleted and the amount of storage freed -func TrimByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, int64, error) { +func TrimByCount(ctx context.Context, l Log, max int) ([]Message, int64, error) { offsets, err := FindByCount(ctx, l, max) if err != nil { return nil, 0, err @@ -63,11 +63,20 @@ func TrimByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, int64 return l.Delete(offsets) } -// TrimByCountMulti is similar to ByCount, but will try to remove messages from multiple segments -func TrimByCountMulti(ctx context.Context, l Log, max int, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// TrimByCountMulti is similar to [TrimByCount], but will try to remove messages from multiple segments +func TrimByCountMulti(ctx context.Context, l Log, max int, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindByCount(ctx, l, max) if err != nil { return nil, 0, err } return DeleteMulti(ctx, l, offsets, backoff) } + +// TrimByCountMultiOffsets is similar to [TrimByCountMulti], but only returns the deleted offsets +func TrimByCountMultiOffsets(ctx context.Context, l Log, max int, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + offsets, err := FindByCount(ctx, l, max) + if err != nil { + return nil, 0, err + } + return DeleteMultiOffsets(ctx, l, offsets, backoff) +} diff --git a/trim_offset.go b/trim_offset.go index 255a792..70ed19d 100644 --- a/trim_offset.go +++ b/trim_offset.go @@ -53,7 +53,7 @@ func FindByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{}, // TrimByOffset tries to remove the messages at the start of the log before offset // // returns the offsets it deleted and the amount of storage freed -func TrimByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{}, int64, error) { +func TrimByOffset(ctx context.Context, l Log, before int64) ([]Message, int64, error) { offsets, err := FindByOffset(ctx, l, before) if err != nil { return nil, 0, err @@ -61,11 +61,20 @@ func TrimByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{}, return l.Delete(offsets) } -// TrimByOffsetMulti is similar to ByOffset, but will try to remove messages from multiple segments -func TrimByOffsetMulti(ctx context.Context, l Log, before int64, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// TrimByOffsetMulti is similar to [TrimByOffset], but will try to remove messages from multiple segments +func TrimByOffsetMulti(ctx context.Context, l Log, before int64, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindByOffset(ctx, l, before) if err != nil { return nil, 0, err } return DeleteMulti(ctx, l, offsets, backoff) } + +// TrimByOffsetMultiOffsets is similar to [TrimByOffsetMulti], but only returns the deleted offsets +func TrimByOffsetMultiOffsets(ctx context.Context, l Log, before int64, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { + offsets, err := FindByOffset(ctx, l, before) + if err != nil { + return nil, 0, err + } + return DeleteMultiOffsets(ctx, l, offsets, backoff) +} diff --git a/trim_size.go b/trim_size.go index 24b41f6..bcc4178 100644 --- a/trim_size.go +++ b/trim_size.go @@ -53,7 +53,7 @@ func FindBySize(ctx context.Context, l Log, sz int64) (map[int64]struct{}, error // TrimBySize tries to remove messages until log size is less than sz // // returns the offsets it deleted and the amount of storage freed -func TrimBySize(ctx context.Context, l Log, sz int64) (map[int64]struct{}, int64, error) { +func TrimBySize(ctx context.Context, l Log, sz int64) ([]Message, int64, error) { offsets, err := FindBySize(ctx, l, sz) if err != nil { return nil, 0, err @@ -61,8 +61,17 @@ func TrimBySize(ctx context.Context, l Log, sz int64) (map[int64]struct{}, int64 return l.Delete(offsets) } -// TrimBySizeMulti is similar to BySize, but will try to remove messages from multiple segments -func TrimBySizeMulti(ctx context.Context, l Log, sz int64, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// TrimBySizeMulti is similar to [TrimBySize], but will try to remove messages from multiple segments +func TrimBySizeMulti(ctx context.Context, l Log, sz int64, backoff DeleteMultiBackoff) ([]Message, int64, error) { + offsets, err := FindBySize(ctx, l, sz) + if err != nil { + return nil, 0, err + } + return DeleteMulti(ctx, l, offsets, backoff) +} + +// TrimBySizeMultiOffsets is similar to [TrimBySizeMulti], but only returns the deleted offsets +func TrimBySizeMultiOffsets(ctx context.Context, l Log, sz int64, backoff DeleteMultiBackoff) ([]Message, int64, error) { offsets, err := FindBySize(ctx, l, sz) if err != nil { return nil, 0, err diff --git a/typed.go b/typed.go index 35a6729..2dda564 100644 --- a/typed.go +++ b/typed.go @@ -42,7 +42,7 @@ type TLog[K any, V any] interface { OffsetByTime(start time.Time) (offset int64, messageTime time.Time, err error) // Delete see [Log.Delete] - Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error) + Delete(offsets map[int64]struct{}) (deletedMessages []TMessage[K, V], deletedSize int64, err error) // Size see [Log.Size] Size(m Message) int64 @@ -179,6 +179,22 @@ func (l *tlog[K, V]) GetByTime(start time.Time) (TMessage[K, V], error) { return l.decode(msg) } +func (l *tlog[K, V]) Delete(offsets map[int64]struct{}) ([]TMessage[K, V], int64, error) { + messages, sz, err := l.Log.Delete(offsets) + if err != nil { + return nil, 0, err + } + + tmessages := make([]TMessage[K, V], len(messages)) + for i, msg := range messages { + tmessages[i], err = l.decode(msg) + if err != nil { + return nil, sz, err + } + } + return tmessages, sz, nil +} + func (l *tlog[K, V]) Raw() Log { return l.Log }