Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (

func Compact(ctx context.Context, l Log, age time.Duration, boff DeleteMultiBackoff) error {
updatesBefore := time.Now().Add(-age)
if _, _, err := CompactUpdatesMulti(ctx, l, updatesBefore, boff); err != nil {
if _, _, err := CompactUpdatesMultiOffsets(ctx, l, updatesBefore, boff); err != nil {
return err
}
deletesBefore := time.Now().Add(-age * 2)
if _, _, err := CompactDeletesMulti(ctx, l, deletesBefore, boff); err != nil {
if _, _, err := CompactDeletesMultiOffsets(ctx, l, deletesBefore, boff); err != nil {
return err
}
return l.GC(0)
Expand Down
8 changes: 3 additions & 5 deletions compact_deletes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
art "github.com/plar/go-adaptive-radix-tree/v2"
)

// FindDeletes returns a set of offsets for messages with
// nil value for a given key, before a given time.
// FindDeletes returns a set of offsets for messages with nil value for a given key, before a given time.
//
// Messages that have a nil value are considered deletes
// for this key, and therefore eligible for deletion.
// Messages that have a nil value are considered deletes for this key, and therefore eligible for deletion.
func FindDeletes(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, error) {
maxOffset, err := l.NextOffset()
if err != nil {
Expand Down Expand Up @@ -66,7 +64,7 @@ SEARCH:
// This is similar to removing keys, which were deleted (e.g. value set to nil)
// and are therefore no longer relevant/active.
//
// returns the offsets it deleted and the amount of storage freed
// returns the messages it deleted and the amount of storage freed
func CompactDeletes(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) {
offsets, err := FindDeletes(ctx, l, before)
if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions compact_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
art "github.com/plar/go-adaptive-radix-tree/v2"
)

// FindUpdates returns a set of offsets for messages that have
// the same key further in the log, before a given time.
// FindUpdates returns a set of offsets for messages that have the same key further in the log, before a given time.
//
// Messages before the last one for a given key are considered updates
// that are no longer relevant, and therefore are eligible for deletion.
Expand Down Expand Up @@ -54,10 +53,9 @@ SEARCH:
// CompactUpdates tries to remove messages before given time that are repeated
// further in the log leaving only the last message for a given key.
//
// This is similar to removing the old value updates,
// leaving only the current value (last update) for a key.
// This is similar to removing the old value updates, leaving only the current value (last update) for a key.
//
// returns the offsets it deleted and the amount of storage freed
// returns the messages it deleted and the amount of storage freed
func CompactUpdates(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) {
offsets, err := FindUpdates(ctx, l, before)
if err != nil {
Expand Down
9 changes: 3 additions & 6 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff {
}
}

// DeleteMulti tries to delete all messages with offsets
// from the log and returns the amount of storage deleted
// DeleteMulti tries to delete all messages with offsets from the log and returns the amount of storage deleted
//
// If error is encountered, it will return the deleted offsets
// and size, together with the error
// # If error is encountered, it will return the deleted offsets and size, together with the error
//
// [DeleteMultiBackoff] is called on each iteration to give
// others a chance to work with the log, while being deleted
// [DeleteMultiBackoff] is called on each iteration to give others a chance to work with the log, while being deleted
func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) ([]Message, int64, error) {
var remainingOffsets = maps.Clone(offsets)
var deletedMessages []Message
Expand Down
5 changes: 2 additions & 3 deletions trim_age.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
"time"
)

// FindByAge returns a set of offsets for messages that are
// at the start of the log and before given time.
// FindByAge returns a set of offsets for messages that are at the start of the log and before given time.
func FindByAge(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, error) {
maxOffset, _, err := l.OffsetByTime(before)
switch {
Expand Down Expand Up @@ -63,7 +62,7 @@ SEARCH:

// TrimByAge tries to remove the messages at the start of the log before given time.
//
// returns the offsets it deleted and the amount of storage freed
// returns the messages it deleted and the amount of storage freed
func TrimByAge(ctx context.Context, l Log, before time.Time) ([]Message, int64, error) {
offsets, err := FindByAge(ctx, l, before)
if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions trim_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import (
"context"
)

// FindByCount returns a set of offsets for messages that when
// removed will keep the number of messages in the log under max
// FindByCount returns a set of offsets for messages that when removed will keep the number of messages in the log under max
func FindByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, error) {
stats, err := l.Stat()
switch {
Expand Down Expand Up @@ -51,10 +50,9 @@ func FindByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, error
return offsets, nil
}

// TrimByCount tries to remove messages to keep the number of messages
// in the log under max count.
// TrimByCount tries to remove messages to keep the number of messages in the log under max count.
//
// returns the offsets it deleted and the amount of storage freed
// returns the messages it deleted and the amount of storage freed
func TrimByCount(ctx context.Context, l Log, max int) ([]Message, int64, error) {
offsets, err := FindByCount(ctx, l, max)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions trim_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
"github.com/klev-dev/klevdb/pkg/message"
)

// FindByOffset returns a set of offsets for messages whose
// offset is before a given offset
// FindByOffset returns a set of offsets for messages whose offset is before a given offset
func FindByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{}, error) {
if before == message.OffsetOldest {
return map[int64]struct{}{}, nil
Expand Down Expand Up @@ -52,7 +51,7 @@ func FindByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{},

// TrimByOffset tries to remove the messages at the start of the log before offset
//
// returns the offsets it deleted and the amount of storage freed
// returns the messages it deleted and the amount of storage freed
func TrimByOffset(ctx context.Context, l Log, before int64) ([]Message, int64, error) {
offsets, err := FindByOffset(ctx, l, before)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion trim_size.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func FindBySize(ctx context.Context, l Log, sz int64) (map[int64]struct{}, error

// TrimBySize tries to remove messages until log size is less than sz
//
// returns the offsets it deleted and the amount of storage freed
// returns the messages it deleted and the amount of storage freed
func TrimBySize(ctx context.Context, l Log, sz int64) ([]Message, int64, error) {
offsets, err := FindBySize(ctx, l, sz)
if err != nil {
Expand Down
Loading