Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
"mode": "debug",
"program": "${workspaceFolder}/cmd/kobs",
"args": [
"hub",
"--log.level=debug",
"--hub.config=../../deploy/docker/kobs/hub.yaml",
"--app.assets=''"
"--app.assets="
]
},
{
Expand All @@ -20,6 +21,7 @@
"mode": "debug",
"program": "${workspaceFolder}/cmd/kobs",
"args": [
"satellite",
"--log.level=debug",
"--satellite.config=../../deploy/docker/kobs/satellite.yaml",
"--satellite.token=unsecuretoken"
Expand Down
31 changes: 31 additions & 0 deletions docs/plugins/klogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ To use the klogs plugin the following configuration is needed in the satellites
| options.maxIdleConns | number | ClickHouse maximum number of idle connections. The default value is `5`. | No |
| options.maxOpenConns | number | ClickHouse maximum number of open connections. The default value is `10`. | No |
| options.materializedColumns | []string | A list of materialized columns. See [kobsio/klogs](https://github.com/kobsio/klogs#configuration) for more information. | No |
| options.integrations | []object | A list of integrations. See [Integrations](#integrations) for more information. | No |

```yaml
plugins:
Expand All @@ -36,6 +37,7 @@ plugins:
maxIdleConns:
maxOpenConns:
materializedColumns:
integrations:
```

## Insight Options
Expand All @@ -58,6 +60,35 @@ The following options can be used for a panel with the klogs plugin:
| queries | [[]Query](#query) | A list of queries, which can be selected by the user. This is only required for type `logs`. | Yes |
| aggregation | [Aggregation](#aggregation) | Options for the aggregation. This is only required for type `aggregation`. | Yes |

## Integrations

Integrations enhance the experience of the klogs plugin. The integrations are differentiated by the `type` property on the integration config. Integrations share the following properties:

| Field | Type | Description | Required |
| ----- | ---- | ----------- | -------- |
| name | string | Sets the name of the integration | No |
| type | string | Sets the integration type. | Yes |

The following integrations are available:

### Integration - Autolinks (`type: autolinks`)

The autolinks integration converts a value of a specific column to a link. This could be a link to a different kobs plugin. It's possible to use relative or absolute paths.
The options for this integration type are specified under the property `autolinks` and use the following configuration values:

| Field | Type | Description | Required |
| ----- | ---- | ----------- | -------- |
| columnName | string | The column name for which the autolinks should be applied | Yes |
| path | string | The path for the autolink (can be absolute or relative) | Yes |

The path property can contain placeholders for the document value and time frame values:

| Placeholder | Description |
| ----- | ---- |
| `<<value>>` | placeholder for the document value |
| `<<timeStart>>` | placeholder for the klogs start time (unix timestamp) |
| `<<timeEnd>>` | placeholder for the klogs end time (unix timestamp) |

### Query

| Field | Type | Description | Required |
Expand Down
2 changes: 1 addition & 1 deletion plugins/plugin-klogs/cmd/klogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {

data := struct {
Documents []map[string]any `json:"documents"`
Fields []string `json:"fields"`
Fields []instance.Field `json:"fields"`
Count int64 `json:"count"`
Took int64 `json:"took"`
Buckets []instance.Bucket `json:"buckets"`
Expand Down
4 changes: 2 additions & 2 deletions plugins/plugin-klogs/pkg/instance/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type AggregationTimes struct {
// We can also directly say that the passed in field must be a number field, e.g. aggregation with the min, max, sum or
// avg operation can only run against number fields.
func generateFieldName(fieldName string, materializedColumns []string, customFields Fields, mustNumber bool) string {
if contains(defaultFields, fieldName) || contains(materializedColumns, fieldName) {
if containsField(defaultFields, Field{Name: fieldName}) || contains(materializedColumns, fieldName) {
return fieldName
}

Expand All @@ -61,7 +61,7 @@ func generateFieldName(fieldName string, materializedColumns []string, customFie
}

for _, field := range customFields.Number {
if field == fieldName {
if field.Name == fieldName {
return fmt.Sprintf("fields_number['%s']", fieldName)
}
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/plugin-klogs/pkg/instance/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestGenerateFieldName(t *testing.T) {
{field: "content_duration", mustNumber: false, expect: "fields_number['content_duration']"},
} {
t.Run(tt.field, func(t *testing.T) {
actual := generateFieldName(tt.field, nil, Fields{String: nil, Number: []string{"content_duration"}}, tt.mustNumber)
actual := generateFieldName(tt.field, nil, Fields{String: nil, Number: []Field{{Name: "content_duration"}}}, tt.mustNumber)
require.Equal(t, tt.expect, actual)
})
}
Expand Down
48 changes: 37 additions & 11 deletions plugins/plugin-klogs/pkg/instance/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,22 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

type Field struct {
Name string `json:"name"`
AutolinkPath string `json:"autolinkPath,omitempty"`
}

func fieldsFromNames(names ...string) []Field {
ret := make([]Field, len(names))
for i, n := range names {
ret[i] = Field{Name: n}
}

return ret
}

var (
defaultFields = []string{"timestamp", "cluster", "namespace", "app", "pod_name", "container_name", "host", "log"}
defaultFields = fieldsFromNames("timestamp", "cluster", "namespace", "app", "pod_name", "container_name", "host", "log")
defaultColumns = "timestamp, cluster, namespace, app, pod_name, container_name, host, fields_string, fields_number, log"

fieldsMetric = promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -138,7 +152,7 @@ func handleConditionParts(key, value, operator string, materializedColumns []str
// ALTER TABLE logs.logs ON CLUSTER '{cluster}' ADD COLUMN <FIELD> Float64 DEFAULT fields_number['<FIELD>'];
fieldsMetric.WithLabelValues(key).Inc()

if contains(defaultFields, key) || contains(materializedColumns, key) {
if containsField(defaultFields, Field{Name: key}) || contains(materializedColumns, key) {
if operator == "=~" {
return fmt.Sprintf("%s ILIKE %s", key, value), nil
}
Expand Down Expand Up @@ -186,7 +200,7 @@ func handleConditionParts(key, value, operator string, materializedColumns []str
}

func handleExistsCondition(key string, materializedColumns []string) string {
if contains(defaultFields, key) || contains(materializedColumns, key) {
if containsField(defaultFields, Field{Name: key}) || contains(materializedColumns, key) {
return fmt.Sprintf("%s IS NOT NULL", key)
}

Expand All @@ -205,7 +219,7 @@ func parseOrder(order, orderBy string, materializedColumns []string) string {
}

orderBy = strings.TrimSpace(orderBy)
if contains(defaultFields, orderBy) || contains(materializedColumns, orderBy) {
if containsField(defaultFields, Field{Name: orderBy}) || contains(materializedColumns, orderBy) {
return fmt.Sprintf("%s %s", orderBy, order)
}

Expand All @@ -226,25 +240,37 @@ func getBucketTimes(interval, bucketTime, timeStart, timeEnd int64) (int64, int6
return bucketTime, bucketTime + interval
}

// appendIfMissing appends a value to a slice, when this values doesn't exist in the slice already.
func appendIfMissing(items []string, item string) []string {
// appendIf appends a value to a slice, when the predicate returns true
func appendIf[T any](items []T, item T, predecate func(iter, newItem T) bool) []T {
for _, ele := range items {
if ele == item {
if predecate(ele, item) {
return items
}
}

return append(items, item)
}

// contains checks if the given slice of string contains the given item. It returns true when the slice contains the
// given item.
func contains(items []string, item string) bool {
func appendIfFieldIsMissing(items []Field, item Field) []Field {
return appendIf(items, item, func(a, b Field) bool { return a.Name == b.Name })
}

func some[T any](items []T, item T, predecate func(a, b T) bool) bool {
for _, ele := range items {
if ele == item {
if predecate(ele, item) {
return true
}
}

return false
}

// contains checks if the given slice of string contains the given item. It returns true when the slice contains the
// given item.
func contains(items []string, item string) bool {
return some(items, item, func(a, b string) bool { return a == b })
}

func containsField(items []Field, item Field) bool {
return some(items, item, func(a, b Field) bool { return a.Name == b.Name })
}
38 changes: 31 additions & 7 deletions plugins/plugin-klogs/pkg/instance/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,39 @@ func TestGetInterval(t *testing.T) {
}
}

func TestAppendIfMissing(t *testing.T) {
items := []string{"foo", "bar"}
func TestAppendIf(t *testing.T) {
t.Run("works with strings", func(t *testing.T) {
strCmpr := func(a, b string) bool { return a == b }
items := []string{"foo", "bar"}

items = appendIf(items, "foo", strCmpr)
require.Equal(t, []string{"foo", "bar"}, items)

items = appendIf(items, "hello", strCmpr)
items = appendIf(items, "world", strCmpr)
require.Equal(t, []string{"foo", "bar", "hello", "world"}, items)
})

t.Run("works with int's", func(t *testing.T) {
appendIfGreater := func(items []int, item int) []int {
return appendIf(items, item, func(a, b int) bool { return a > b })
}

require.Equal(t, []int{1, 2, 3}, appendIfGreater([]int{1, 2, 3}, 0))
require.Equal(t, []int{1, 2, 3, 100}, appendIfGreater([]int{1, 2, 3}, 100))
})
}

func TestSome(t *testing.T) {
items := []int64{1, 2, 3}

items = appendIfMissing(items, "foo")
require.Equal(t, []string{"foo", "bar"}, items)
require.True(t, some(items, 12, func(a, b int64) bool {
return a%10 == b%10
}))

items = appendIfMissing(items, "hello")
items = appendIfMissing(items, "world")
require.Equal(t, []string{"foo", "bar", "hello", "world"}, items)
require.False(t, some(items, 14, func(a, b int64) bool {
return a%10 == b%10
}))
}

func TestContains(t *testing.T) {
Expand Down
55 changes: 34 additions & 21 deletions plugins/plugin-klogs/pkg/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,36 @@ import (
"go.uber.org/zap"
)

type IntegrationAutolinks struct {
ColumnName string `json:"columnName"`
Path string `json:"path"`
}

type Integration struct {
Type string `json:"type"`
Autolinks IntegrationAutolinks
}

// Config is the structure of the configuration for a single klogs instance.
type Config struct {
Address string `json:"address"`
Database string `json:"database"`
Username string `json:"username"`
Password string `json:"password"`
DialTimeout string `json:"dialTimeout"`
ConnMaxLifetime string `json:"connMaxLifetime"`
MaxIdleConns int `json:"maxIdleConns"`
MaxOpenConns int `json:"maxOpenConns"`
MaterializedColumns []string `json:"materializedColumns"`
Address string `json:"address"`
Database string `json:"database"`
Username string `json:"username"`
Password string `json:"password"`
DialTimeout string `json:"dialTimeout"`
ConnMaxLifetime string `json:"connMaxLifetime"`
MaxIdleConns int `json:"maxIdleConns"`
MaxOpenConns int `json:"maxOpenConns"`
MaterializedColumns []string `json:"materializedColumns"`
Integrations []Integration `json:"integration"`
}

type Instance interface {
GetName() string
getFields(ctx context.Context) (Fields, error)
refreshCachedFields() []string
GetFields(filter string, fieldType string) []string
GetLogs(ctx context.Context, query, order, orderBy string, limit, timeStart, timeEnd int64) ([]map[string]any, []string, int64, int64, []Bucket, error)
GetFields(filter string, fieldType string) []Field
GetLogs(ctx context.Context, query, order, orderBy string, limit, timeStart, timeEnd int64) ([]map[string]any, []Field, int64, int64, []Bucket, error)
GetRawQueryResults(ctx context.Context, query string) ([][]any, []string, error)
GetAggregation(ctx context.Context, aggregation Aggregation) ([]map[string]any, []string, error)
}
Expand All @@ -44,6 +55,7 @@ type instance struct {
client *sql.DB
materializedColumns []string
cachedFields Fields
integrations []Integration
}

func (i *instance) GetName() string {
Expand All @@ -62,16 +74,16 @@ func (i *instance) getFields(ctx context.Context) (Fields, error) {
defer rowsFieldKeys.Close()

for rowsFieldKeys.Next() {
var field string
var f Field

if err := rowsFieldKeys.Scan(&field); err != nil {
if err := rowsFieldKeys.Scan(&f.Name); err != nil {
return fields, err
}

if fieldType == "string" {
fields.String = append(fields.String, field)
fields.String = append(fields.String, f)
} else if fieldType == "number" {
fields.Number = append(fields.Number, field)
fields.Number = append(fields.Number, f)
}
}

Expand Down Expand Up @@ -113,24 +125,24 @@ func (i *instance) refreshCachedFields() []string {
log.Info(ctx, "Refreshed fields", zap.Int("stringFieldsCount", len(fields.String)), zap.Int("numberFieldsCount", len(fields.Number)))

for _, field := range fields.String {
i.cachedFields.String = appendIfMissing(i.cachedFields.String, field)
i.cachedFields.String = appendIfFieldIsMissing(i.cachedFields.String, field)
}

for _, field := range fields.Number {
i.cachedFields.Number = appendIfMissing(i.cachedFields.Number, field)
i.cachedFields.Number = appendIfFieldIsMissing(i.cachedFields.Number, field)
}
}
}
}
}

// GetFields returns all cached fields which are containing the filter term. The cached fields are refreshed every 24.
func (i *instance) GetFields(filter string, fieldType string) []string {
var fields []string
func (i *instance) GetFields(filter string, fieldType string) []Field {
var fields []Field

if fieldType == "string" || fieldType == "" {
for _, field := range i.cachedFields.String {
if strings.Contains(field, filter) {
if strings.Contains(field.Name, filter) {
fields = append(fields, field)
}
}
Expand All @@ -140,7 +152,7 @@ func (i *instance) GetFields(filter string, fieldType string) []string {

if fieldType == "number" || fieldType == "" {
for _, field := range i.cachedFields.Number {
if strings.Contains(field, filter) {
if strings.Contains(field.Name, filter) {
fields = append(fields, field)
}
}
Expand Down Expand Up @@ -238,6 +250,7 @@ func New(name string, options map[string]any) (Instance, error) {
database: config.Database,
client: client,
materializedColumns: config.MaterializedColumns,
integrations: config.Integrations,
}

go instance.refreshCachedFields()
Expand Down
Loading