Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.24.9-bookworm
FROM golang:1.21.1-bookworm

WORKDIR /tests

Expand Down
19 changes: 1 addition & 18 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,13 @@ type HTTPClient struct {

func DefaultClient(url url.URL, username string, password string) HTTPClient {
return HTTPClient{
client: http.Client{
Timeout: 60 * time.Second,
Transport: &http.Transport{},
},
client: http.Client{Timeout: 60 * time.Second},
Url: url,
Username: username,
Password: password,
}
}

// NewClient creates a new HTTPClient with its own connection pool.
// Use this in parallel tests to avoid connection sharing issues.
func NewClient(base HTTPClient) HTTPClient {
return HTTPClient{
client: http.Client{
Timeout: 60 * time.Second,
Transport: &http.Transport{},
},
Url: base.Url,
Username: base.Username,
Password: base.Password,
}
}

func (client *HTTPClient) baseAPIURL(path string) (x string) {
x, _ = url.JoinPath(client.Url.String(), "api/v1/", path)
return
Expand Down
49 changes: 16 additions & 33 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,46 +1,29 @@
module quest

go 1.24.9
go 1.21.1

require (
github.com/apache/arrow-go/v18 v18.5.2
github.com/minio/minio-go v6.0.14+incompatible
github.com/stretchr/testify v1.11.1
github.com/stretchr/testify v1.8.4
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20230919034749-0b16411e6349
)

require (
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/apache/thrift v0.22.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.14.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-ini/ini v1.25.4 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/google/flatbuffers v25.12.19+incompatible // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.18.4 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.25 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/smartystreets/goconvey v1.8.1 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect
golang.org/x/mod v0.33.0 // indirect
golang.org/x/net v0.50.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/tools v0.42.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
google.golang.org/grpc v1.79.1 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
1,109 changes: 1,017 additions & 92 deletions go.sum

Large diffs are not rendered by default.

172 changes: 72 additions & 100 deletions integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -28,54 +27,77 @@ import (
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"testing"
"time"

"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/minio/minio-go"
"github.com/stretchr/testify/require"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/reader"
)

type Flog struct {
Host string `json:"host"`
UserId string `json:"user-identifier"`
Timestamp string `json:"datetime"`
Method string `json:"method"`
Request string `json:"request"`
Protocol string `json:"protocol"`
Status float64 `json:"status"`
ByteCount float64 `json:"bytes"`
Referer string `json:"referer"`
Host string `json:"host"`
UserId string `json:"user-identifier"`
Timestamp string `json:"datetime"`
Method string `json:"method"`
Request string `json:"request"`
Protocol string `json:"protocol"`
Status uint16 `json:"status"`
ByteCount uint64 `json:"bytes"`
Referer string `json:"referer"`
}

// Same as `Flog`, but all fields are pointers, because `parquet-go` is only
// working when fields are pointers.
type ParquetFlog struct {
Host *string `parquet:"name=host, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
UserId *string `parquet:"name=user-identifier, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Timestamp *string `parquet:"name=datetime, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Method *string `parquet:"name=method, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Request *string `parquet:"name=request, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Protocol *string `parquet:"name=protocol, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Status *uint16 `parquet:"name=status, type=INT32, encoding=PLAIN"`
ByteCount *uint64 `parquet:"name=bytes, type=INT32, encoding=PLAIN"`
Referer *string `parquet:"name=referer, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
}

func (flog *ParquetFlog) Deref() Flog {
return Flog{
Host: *flog.Host,
UserId: *flog.UserId,
Timestamp: *flog.Timestamp,
Method: *flog.Method,
Request: *flog.Request,
Protocol: *flog.Protocol,
Status: *flog.Status,
ByteCount: *flog.ByteCount,
Referer: *flog.Referer,
}
}

// - Send logs to Parseable
// - Wait for sync
// - Download parquet files from the store created by Parseable for the minute
// - Compare the sent logs with the ones loaded from the downloaded parquet
func TestIntegrity(t *testing.T) {
t.Parallel()
stream := uniqueStream(t)
CreateStream(t, NewGlob.QueryClient, stream)
CreateStream(t, NewGlob.QueryClient, NewGlob.Stream)
iterations := 1
flogsPerIteration := 100

parseableSyncWait := 2 * time.Minute // NOTE: This needs to be in sync with Parseable's.
parseableSyncWait := 3 * time.Minute // NOTE: This needs to be in sync with Parseable's.

// - Generate log files using `flog`
// - Load them into `Flog` structs
// - Ingest them into Parseable

flogs := make([]Flog, 0, iterations*flogsPerIteration)

tmpDir := t.TempDir()
for i := range iterations {
flogsFile := filepath.Join(tmpDir, fmt.Sprintf("%s_%d_%s.log", stream, i, randSuffix()))
for i := 0; i < iterations; i++ {
flogsFile := fmt.Sprintf("%d.log", i)

err := exec.Command("flog",
"--number", strconv.Itoa(flogsPerIteration),
Expand All @@ -89,7 +111,7 @@ func TestIntegrity(t *testing.T) {

loadedFlogs := loadFlogsFromFile(flogsFile)

err = ingestFlogs(loadedFlogs, stream)
err = ingestFlogs(loadedFlogs, NewGlob.Stream)
if err != nil {
t.Fatal("error ingesting flogs", err)
}
Expand All @@ -105,26 +127,20 @@ func TestIntegrity(t *testing.T) {
// XXX: We don't need to sleep for the entire minute, just until the next minute boundary.
}

parquetFiles := downloadParquetFiles(stream, NewGlob.MinIoConfig)
parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig)
actualFlogs := loadFlogsFromParquetFiles(parquetFiles)

require.Equal(t, len(flogs), len(actualFlogs), "row count mismatch")

// Row order from parquet not deterministic under concurrent writes.
// Sort both slices on a stable composite key before compare.
sortKey := func(f Flog) string {
return fmt.Sprintf("%s|%s|%s|%s|%v|%v|%s",
f.Timestamp, f.Host, f.UserId, f.Request, f.Status, f.ByteCount, f.Referer)
}
sort.Slice(flogs, func(i, j int) bool { return sortKey(flogs[i]) < sortKey(flogs[j]) })
sort.Slice(actualFlogs, func(i, j int) bool { return sortKey(actualFlogs[i]) < sortKey(actualFlogs[j]) })
rowCount := len(actualFlogs)

for i, expectedFlog := range flogs {
actualFlog := actualFlogs[i]
require.Equal(t, expectedFlog, actualFlog)
// The rows in parquet written by Parseable will be latest first, so we
// compare the first of ours with the last of what we got from Parseable's
// store.
actualFlog := actualFlogs[rowCount-i-1].Deref()
require.Equal(t, actualFlog, expectedFlog)
}

DeleteStream(t, NewGlob.QueryClient, stream)
DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
}

func ingestFlogs(flogs []Flog, stream string) error {
Expand Down Expand Up @@ -196,90 +212,46 @@ func downloadParquetFiles(stream string, config MinIoConfig) []string {
f.Close()
}

// Reverse the filenames, because we want latest files first (only if there are multiple files)
if len(downloadedFileNames) > 1 {
for i, j := 0, len(downloadedFileNames)-1; i < j; i, j = i+1, j-1 {
downloadedFileNames[i], downloadedFileNames[j] = downloadedFileNames[j], downloadedFileNames[i]
}
}

slog.Info("downloaded files", "paths", downloadedFileNames)

return downloadedFileNames
}

func loadFlogsFromParquetFile(path string) []Flog {
func loadFlogsFromParquetFile(path string) []ParquetFlog {
fr, err := local.NewLocalFileReader(path)
slog.Info("reading parquet file", "path", path)

rdr, err := file.OpenParquetFile(path, false)
if err != nil {
slog.Error("can't open parquet file", "error", err)
return nil
slog.Error("can't create local file reader", "error", err)
}
defer rdr.Close()

arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
if err != nil {
slog.Error("can't create arrow reader", "error", err)
return nil
}
defer fr.Close()

tbl, err := arrowRdr.ReadTable(context.Background())
pr, err := reader.NewParquetReader(fr, new(ParquetFlog), 4)
if err != nil {
slog.Error("can't read table", "error", err)
return nil
slog.Error("can't create parquet reader", "error", err)
}
defer tbl.Release()

numRows := int(tbl.NumRows())
slog.Info("read parquet", "rows", numRows, "path", path)
defer pr.ReadStop()

// Build column index for lookup by name
colIndex := make(map[string]int)
for i := 0; i < int(tbl.NumCols()); i++ {
colIndex[tbl.Column(i).Name()] = i
}
flogs := make([]ParquetFlog, pr.GetNumRows())

getStringCol := func(name string) *array.String {
idx, ok := colIndex[name]
if !ok {
return nil
}
return tbl.Column(idx).Data().Chunk(0).(*array.String)
}

getFloat64Col := func(name string) *array.Float64 {
idx, ok := colIndex[name]
if !ok {
return nil
}
return tbl.Column(idx).Data().Chunk(0).(*array.Float64)
}

hostCol := getStringCol("host")
userIdCol := getStringCol("user-identifier")
datetimeCol := getStringCol("datetime")
methodCol := getStringCol("method")
requestCol := getStringCol("request")
protocolCol := getStringCol("protocol")
statusCol := getFloat64Col("status")
bytesCol := getFloat64Col("bytes")
refererCol := getStringCol("referer")

flogs := make([]Flog, numRows)
for i := range numRows {
flogs[i] = Flog{
Host: hostCol.Value(i),
UserId: userIdCol.Value(i),
Timestamp: datetimeCol.Value(i),
Method: methodCol.Value(i),
Request: requestCol.Value(i),
Protocol: protocolCol.Value(i),
Status: statusCol.Value(i),
ByteCount: bytesCol.Value(i),
Referer: refererCol.Value(i),
}
if err = pr.Read(&flogs); err != nil {
slog.Error("can't read parquet file", "error", err)
}

return flogs
}

func loadFlogsFromParquetFiles(parquetFiles []string) []Flog {
func loadFlogsFromParquetFiles(parquetFiles []string) []ParquetFlog {
slog.Info("loading flogs from parquet files", "paths", parquetFiles, "count", len(parquetFiles))
flogs := make([]Flog, 0, len(parquetFiles)*10)
flogs := make([]ParquetFlog, 0, len(parquetFiles)*10)

for _, parquetFile := range parquetFiles {
flogs = append(flogs, loadFlogsFromParquetFile(parquetFile)...)
Expand Down
Loading
Loading