Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ bench-get:
go test -bench=BenchmarkSingle/Get/V2 -benchmem -run XXX

bench-multi:
go test -bench=BenchmarkMulti/V2 -benchmem -run XXX
go test -bench=BenchmarkMulti//V2 -benchmem -run XXX

bench-keys:
go test -bench=BenchmarkKeys -benchmem -run XXX
Expand Down
5 changes: 3 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ type Log interface {
// Delete tries to delete a set of messages by their offset
// from the log and returns the amount of storage deleted
// It does not guarantee that it will delete all messages,
// it returns the set of actually deleted offsets.
Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error)
// it returns list of actually deleted messages.
Delete(offsets map[int64]struct{}) (deletedMessages []Message, deletedSize int64, err error)

// Size returns the amount of storage a message occupies in the
// NewSegmentsVersion format (see VersionOptions), plus the index overhead.
Expand Down Expand Up @@ -201,6 +201,7 @@ func Recover(dir string, opts Options) error {
})
}

// Migrate rewrites all segments with a concrete options and version
func Migrate(dir string, opts Options, version Version) error {
if version == vUnknown {
return fmt.Errorf("migrate: version must be specified (e.g. klevdb.V2)")
Expand Down
15 changes: 12 additions & 3 deletions compact_deletes.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,28 @@ SEARCH:
// and are therefore no longer relevant/active.
//
// returns the offsets it deleted and the amount of storage freed
func CompactDeletes(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) {
func CompactDeletes(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) {
offsets, err := FindDeletes(ctx, l, before)
if err != nil {
return nil, 0, err
}
return l.Delete(offsets)
}

// CompactDeletesMulti is similar to Deletes, but will try to remove messages from multiple segments
func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
// CompactDeletesMulti is similar to [CompactDeletes], but will try to remove messages from multiple segments
func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) {
offsets, err := FindDeletes(ctx, l, before)
if err != nil {
return nil, 0, err
}
return DeleteMulti(ctx, l, offsets, backoff)
}

// CompactDeletesMultiOffsets is similar to [CompactDeletesMulti], but only returns the deleted offsets
func CompactDeletesMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
offsets, err := FindDeletes(ctx, l, before)
if err != nil {
return nil, 0, err
}
return DeleteMultiOffsets(ctx, l, offsets, backoff)
}
16 changes: 8 additions & 8 deletions compact_deletes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ func TestDeletes(t *testing.T) {
require.NoError(t, err)
defer l.Close()

off, cmp, err := CompactDeletes(context.TODO(), l, time.Now())
deletedMsgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Empty(t, off)
require.Empty(t, deletedMsgs)
require.Equal(t, int64(0), cmp)
})

Expand All @@ -33,9 +33,9 @@ func TestDeletes(t *testing.T) {
_, err = l.Publish(msgs)
require.NoError(t, err)

off, cmp, err := CompactDeletes(context.TODO(), l, time.Now())
deletedMsgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Empty(t, off)
require.Empty(t, deletedMsgs)
require.Equal(t, int64(0), cmp)
})

Expand All @@ -55,11 +55,11 @@ func TestDeletes(t *testing.T) {
_, err = l.Publish(msgs)
require.NoError(t, err)

off, cmp, err := CompactDeletes(context.TODO(), l, time.Now())
deletedMsgs, cmp, err := CompactDeletes(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Len(t, off, 5)
for i := range nmsgs {
require.Contains(t, off, int64(i))
require.Len(t, deletedMsgs, 5)
for _, nmsg := range nmsgs {
require.Contains(t, deletedMsgs, nmsg)
}
require.Equal(t, l.Size(nmsgs[0])*int64(len(nmsgs)), cmp)
})
Expand Down
17 changes: 13 additions & 4 deletions compact_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,35 @@ SEARCH:
return offsets, nil
}

// Updates tries to remove messages before given time that are repeated
// CompactUpdates tries to remove messages before given time that are repeated
// further in the log leaving only the last message for a given key.
//
// This is similar to removing the old value updates,
// leaving only the current value (last update) for a key.
//
// returns the offsets it deleted and the amount of storage freed
func CompactUpdates(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) {
func CompactUpdates(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) {
offsets, err := FindUpdates(ctx, l, before)
if err != nil {
return nil, 0, err
}
return l.Delete(offsets)
}

// UpdatesMulti is similar to Updates, but will try to remove messages from multiple segments
func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
// CompactUpdatesMulti is similar to [CompactUpdates], but will try to remove messages from multiple segments
func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) ([]Message, int64, error) {
offsets, err := FindUpdates(ctx, l, before)
if err != nil {
return nil, 0, err
}
return DeleteMulti(ctx, l, offsets, backoff)
}

// CompactUpdatesMultiOffsets is similar to [CompactUpdatesMulti], but only returns the deleted offsets
func CompactUpdatesMultiOffsets(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
offsets, err := FindUpdates(ctx, l, before)
if err != nil {
return nil, 0, err
}
return DeleteMultiOffsets(ctx, l, offsets, backoff)
}
52 changes: 26 additions & 26 deletions compact_updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func TestUpdates(t *testing.T) {
_, err = l.Publish(msgs)
require.NoError(t, err)

off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Empty(t, off)
require.Empty(t, deletedMsgs)
require.Equal(t, int64(0), cmp)

gmsg, err := l.GetByKey(msgs[0].Key)
Expand All @@ -56,10 +56,10 @@ func TestUpdates(t *testing.T) {
_, err = l.Publish(dmsgs)
require.NoError(t, err)

off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Len(t, off, 1)
require.Contains(t, off, int64(0))
require.Len(t, deletedMsgs, 1)
require.Contains(t, deletedMsgs, msgs[0])
require.Equal(t, l.Size(msgs[0]), cmp)

gmsg, err := l.GetByKey(msgs[0].Key)
Expand All @@ -80,10 +80,10 @@ func TestUpdates(t *testing.T) {
_, err = l.Publish(dmsgs)
require.NoError(t, err)

off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Len(t, off, 1)
require.Contains(t, off, int64(4))
require.Len(t, deletedMsgs, 1)
require.Contains(t, deletedMsgs, msgs[4])
require.Equal(t, l.Size(msgs[0]), cmp)

gmsg, err := l.GetByKey(msgs[4].Key)
Expand All @@ -99,20 +99,21 @@ func TestUpdates(t *testing.T) {
_, err = l.Publish(msgs)
require.NoError(t, err)

_, err = l.Publish(msgs)
upmsgs := slices.Clone(msgs)
_, err = l.Publish(upmsgs)
require.NoError(t, err)

off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Len(t, off, len(msgs))
for i := range msgs {
require.Contains(t, off, int64(i))
require.Len(t, deletedMsgs, len(msgs))
for _, msg := range msgs {
require.Contains(t, deletedMsgs, msg)
}
require.Equal(t, l.Size(msgs[0])*int64(len(msgs)), cmp)

gmsg, err := l.GetByKey(msgs[1].Key)
require.NoError(t, err)
require.Equal(t, msgs[1], gmsg)
require.Equal(t, upmsgs[1], gmsg)
})

t.Run("Time", func(t *testing.T) {
Expand All @@ -130,11 +131,11 @@ func TestUpdates(t *testing.T) {
_, err = l.Publish(nmsgs)
require.NoError(t, err)

off, cmp, err := CompactUpdates(context.TODO(), l, nmsgs[2].Time)
deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, nmsgs[2].Time)
require.NoError(t, err)
require.Len(t, off, 3)
require.Len(t, deletedMsgs, 3)
for i := range 3 {
require.Contains(t, off, int64(i))
require.Contains(t, deletedMsgs, msgs[i])
}
require.Equal(t, l.Size(msgs[0])*3, cmp)

Expand All @@ -148,17 +149,16 @@ func TestUpdates(t *testing.T) {
require.NoError(t, err)
defer l.Close()

_, err = l.Publish([]Message{
{Key: []byte("x")},
{},
{},
})
nmsgs := message.Gen(3)
nmsgs[1].Key = nil
nmsgs[2].Key = nil
_, err = l.Publish(nmsgs)
require.NoError(t, err)

off, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
deletedMsgs, cmp, err := CompactUpdates(context.TODO(), l, time.Now())
require.NoError(t, err)
require.Len(t, off, 1)
require.Contains(t, off, int64(1))
require.Equal(t, l.Size(Message{}), cmp)
require.Len(t, deletedMsgs, 1)
require.Contains(t, deletedMsgs, nmsgs[1])
require.Equal(t, l.Size(nmsgs[1]), cmp)
})
}
40 changes: 34 additions & 6 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,36 @@ func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff {
//
// [DeleteMultiBackoff] is called on each iteration to give
// others a chance to work with the log, while being deleted
func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) ([]Message, int64, error) {
var remainingOffsets = maps.Clone(offsets)
var deletedMessages []Message
var deletedSize int64

for len(remainingOffsets) > 0 {
deleted, size, err := l.Delete(remainingOffsets)
switch {
case err != nil:
return deletedMessages, deletedSize, err
case len(deleted) == 0:
return deletedMessages, deletedSize, nil
}

deletedMessages = append(deletedMessages, deleted...)
deletedSize += size
for _, msg := range deleted {
delete(remainingOffsets, msg.Offset)
}

if err := backoff(ctx); err != nil {
return deletedMessages, deletedSize, err
}
}

return deletedMessages, deletedSize, nil
}

// DeleteMultiOffsets is similar to [DeleteMulti] but will only collect offsets instead of whole messages
func DeleteMultiOffsets(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) {
var remainingOffsets = maps.Clone(offsets)
var deletedOffsets = map[int64]struct{}{}
var deletedSize int64
Expand All @@ -46,12 +75,11 @@ func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff
return deletedOffsets, deletedSize, nil
}

maps.Copy(deletedOffsets, deleted)
deletedSize += size
maps.DeleteFunc(remainingOffsets, func(k int64, v struct{}) bool {
_, ok := deleted[k]
return ok
})
for _, msg := range deleted {
deletedOffsets[msg.Offset] = struct{}{}
delete(remainingOffsets, msg.Offset)
}

if err := backoff(ctx); err != nil {
return deletedOffsets, deletedSize, err
Expand Down
40 changes: 39 additions & 1 deletion delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,45 @@ func TestDeleteMulti(t *testing.T) {
require.Equal(t, 10, stats.Messages)
require.Equal(t, 5, stats.Segments)

offsets, sz, err := DeleteMulti(context.TODO(), l, map[int64]struct{}{
deletedMsgs, sz, err := DeleteMulti(context.TODO(), l, map[int64]struct{}{
0: {},
2: {},
3: {},
4: {},
}, DeleteMultiWithWait(time.Millisecond))
require.NoError(t, err)
require.Len(t, deletedMsgs, 4)
require.Contains(t, deletedMsgs, msgs[0])
require.Contains(t, deletedMsgs, msgs[2])
require.Contains(t, deletedMsgs, msgs[3])
require.Contains(t, deletedMsgs, msgs[4])
require.Equal(t, l.Size(msgs[0])*4, sz)

stats, err = l.Stat()
require.NoError(t, err)
require.Equal(t, 6, stats.Messages)
require.Equal(t, 4, stats.Segments)
}

func TestDeleteMultiOffsets(t *testing.T) {
msgs := message.Gen(10)

l, err := Open(t.TempDir(), Options{
TimeIndex: true,
KeyIndex: true,
Rollover: 2 * (message.Size(msgs[0], message.V2) - 1),
})
require.NoError(t, err)
defer l.Close()

publishBatched(t, l, msgs, 1)

stats, err := l.Stat()
require.NoError(t, err)
require.Equal(t, 10, stats.Messages)
require.Equal(t, 5, stats.Segments)

offsets, sz, err := DeleteMultiOffsets(context.TODO(), l, map[int64]struct{}{
0: {},
2: {},
3: {},
Expand Down
10 changes: 5 additions & 5 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (l *log) OffsetByTime(start time.Time) (int64, time.Time, error) {
return msg.Offset, msg.Time, nil
}

func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, error) {
func (l *log) Delete(offsets map[int64]struct{}) ([]Message, int64, error) {
if l.opts.Readonly {
return nil, 0, ErrReadonly
}
Expand All @@ -373,7 +373,7 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err
return l.delete(offsets)
}

func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, error) {
func (l *log) delete(offsets map[int64]struct{}) ([]Message, int64, error) {
rdr, err := l.findDeleteReader(offsets)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -418,7 +418,7 @@ func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err
return nil, 0, err
}

if len(rs.DeletedOffsets) == 0 {
if len(rs.DeletedMessages) == 0 {
// deleted nothing, just remove rewrite files
return nil, 0, rs.Remove()
}
Expand Down Expand Up @@ -447,7 +447,7 @@ func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err
l.readers = append(l.readers, newWriter.reader)
}

return rs.DeletedOffsets, rs.DeletedSize, nil
return rs.DeletedMessages, rs.DeletedSize, nil
}
l.writerMu.Unlock()

Expand Down Expand Up @@ -480,7 +480,7 @@ func (l *log) delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err
}
l.readers = newReaders

return rs.DeletedOffsets, rs.DeletedSize, nil
return rs.DeletedMessages, rs.DeletedSize, nil
}

func (l *log) findDeleteReader(offsets map[int64]struct{}) (*reader, error) {
Expand Down
Loading
Loading