Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/cmd/producerList.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ var producerListCmd = &cobra.Command{
fmt.Printf("%sKafka%s (--output = kafka)\n", Green, Reset)
fmt.Printf("%sHTTP%s (--output = http)\n", Green, Reset)
fmt.Printf("%sRedis%s (--output = redis)\n", Green, Reset)
fmt.Printf("%sRedis HASH%s (--output = redishash)\n", Green, Reset)
fmt.Printf("%sRedis JSON%s (--output = redisjson)\n", Green, Reset)
fmt.Printf("%sMongodb%s (--output = mongo)\n", Green, Reset)
fmt.Printf("%sElastic%s (--output = elastic)\n", Green, Reset)
fmt.Printf("%sS3%s (--output = s3)\n", Green, Reset)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/templateRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func init() {
templateRunCmd.Flags().StringP("topic", "t", constants.DEFAULT_TOPIC, "Kafka topic")

templateRunCmd.Flags().Bool("kcat", false, "If you want to pipe jr with kcat, use this flag: it is equivalent to --output stdout --outputTemplate '{{key}},{{value}}' --oneline")
templateRunCmd.Flags().StringP("output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, http, redis, mongo, elastic, s3, gcs, azblobstorage, azcosmosdb, cassandra, luascript, wasm, awsdynamodb")
templateRunCmd.Flags().StringP("output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, http, redis, redishash, redisjson, mongo, elastic, s3, gcs, azblobstorage, azcosmosdb, cassandra, luascript, wasm, awsdynamodb")
templateRunCmd.Flags().String("outputTemplate", constants.DEFAULT_OUTPUT_TEMPLATE, "Formatting of K,V on standard output")
templateRunCmd.Flags().BoolP("oneline", "l", false, "strips /n from output, for example to be pipelined to tools like kcat")
templateRunCmd.Flags().BoolP("autocreate", "a", false, "if enabled, autocreate topics")
Expand Down
26 changes: 26 additions & 0 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi
return
}

if e.Output == "redishash" {
e.Producer = createRedisHashProducer(ctx, conf.RedisTtl, conf.RedisConfig)
return
}

if e.Output == "redisjson" {
e.Producer = createRedisJSONProducer(ctx, conf.RedisTtl, conf.RedisConfig)
return
}

if e.Output == "mongo" || e.Output == "mongodb" {
e.Producer = createMongoProducer(ctx, conf.MongoConfig)
return
Expand Down Expand Up @@ -213,6 +223,22 @@ func createRedisProducer(_ context.Context, ttl time.Duration, redisConfig strin
return rProducer
}

func createRedisHashProducer(_ context.Context, ttl time.Duration, redisConfig string) Producer {
rProducer := &redis.HashProducer{
Ttl: ttl,
}
rProducer.Initialize(redisConfig)
return rProducer
}

func createRedisJSONProducer(_ context.Context, ttl time.Duration, redisConfig string) Producer {
rProducer := &redis.JSONProducer{
Ttl: ttl,
}
rProducer.Initialize(redisConfig)
return rProducer
}

func createMongoProducer(ctx context.Context, mongoConfig string) Producer {
mProducer := &mongodb.MongoProducer{}
mProducer.Initialize(ctx, mongoConfig)
Expand Down
63 changes: 63 additions & 0 deletions pkg/producers/redis/hashProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package redis

import (
"context"
"encoding/json"
"os"
"time"

"github.com/redis/go-redis/v9"
"github.com/rs/zerolog/log"
)

type HashProducer struct {
client redis.Client
Ttl time.Duration
}

func (p *HashProducer) Initialize(configFile string) {
var options redis.Options

data, err := os.ReadFile(configFile)
if err != nil {
log.Fatal().Err(err).Msg("Failed to load Redis configFile")
}

err = json.Unmarshal(data, &options)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse configuration parameters")
}

p.client = *redis.NewClient(&options)
}

func (p *HashProducer) Close(_ context.Context) error {
err := p.client.Close()
if err != nil {
log.Warn().Err(err).Msg("Failed to close Redis connection")
}
return err
}

func (p *HashProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) {
// Parse the JSON value into a map
var fields map[string]interface{}
err := json.Unmarshal(v, &fields)
if err != nil {
log.Fatal().Err(err).Msg("Failed to unmarshal JSON into hash fields")
}

// Use HSet to set multiple hash fields at once
err = p.client.HSet(ctx, string(k), fields).Err()
if err != nil {
log.Fatal().Err(err).Msg("Failed to write hash data to Redis")
}

// Set TTL if specified
if p.Ttl > 0 {
err = p.client.Expire(ctx, string(k), p.Ttl).Err()
if err != nil {
log.Fatal().Err(err).Msg("Failed to set TTL on Redis hash")
}
}
}
82 changes: 82 additions & 0 deletions pkg/producers/redis/hashProducer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//go:build exclude

// Copyright © 2024 JR team
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package redis

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

func TestHashProducer_Initialize(t *testing.T) {
configFile := "config.json.example"

producer := &HashProducer{}
producer.Initialize(configFile)

assert.NotNil(t, producer.client, "Redis client should be initialized")
}

func TestHashProducer_Produce(t *testing.T) {
configFile := "config.json.example"

producer := &HashProducer{
Ttl: time.Minute,
}
producer.Initialize(configFile)

ctx := context.Background()
key := "test_hash_key"
value := map[string]interface{}{
"field1": "value1",
"field2": "value2",
}
valueBytes, _ := json.Marshal(value)

producer.Produce(ctx, []byte(key), valueBytes, nil)

// Verify the data in Redis
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Adjust as necessary
})
defer client.Close()

result, err := client.HGetAll(ctx, key).Result()
assert.NoError(t, err, "Should not error when getting hash from Redis")
assert.Equal(t, "value1", result["field1"], "Field1 should match")
assert.Equal(t, "value2", result["field2"], "Field2 should match")
}

func TestHashProducer_Close(t *testing.T) {
configFile := "config.json.example"

producer := &HashProducer{}
producer.Initialize(configFile)

err := producer.Close(context.Background())
assert.NoError(t, err, "Should not error when closing Redis connection")
}
63 changes: 63 additions & 0 deletions pkg/producers/redis/jsonProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package redis

import (
"context"
"encoding/json"
"os"
"time"

"github.com/redis/go-redis/v9"
"github.com/rs/zerolog/log"
)

type JSONProducer struct {
client redis.Client
Ttl time.Duration
}

func (p *JSONProducer) Initialize(configFile string) {
var options redis.Options

data, err := os.ReadFile(configFile)
if err != nil {
log.Fatal().Err(err).Msg("Failed to load Redis configFile")
}

err = json.Unmarshal(data, &options)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse configuration parameters")
}

p.client = *redis.NewClient(&options)
}

func (p *JSONProducer) Close(_ context.Context) error {
err := p.client.Close()
if err != nil {
log.Warn().Err(err).Msg("Failed to close Redis connection")
}
return err
}

func (p *JSONProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) {
// Verify the input is valid JSON
var jsonData interface{}
err := json.Unmarshal(v, &jsonData)
if err != nil {
log.Fatal().Err(err).Msg("Failed to validate JSON data")
}

// Use JSON.SET to store the JSON document
err = p.client.Do(ctx, "JSON.SET", string(k), "$", string(v)).Err()
if err != nil {
log.Fatal().Err(err).Msg("Failed to write JSON data to Redis")
}

// Set TTL if specified
if p.Ttl > 0 {
err = p.client.Expire(ctx, string(k), p.Ttl).Err()
if err != nil {
log.Fatal().Err(err).Msg("Failed to set TTL on Redis key")
}
}
}
94 changes: 94 additions & 0 deletions pkg/producers/redis/jsonProducer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//go:build exclude

// Copyright © 2024 JR team
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package redis

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

func TestJSONProducer_Initialize(t *testing.T) {
configFile := "config.json.example"

producer := &JSONProducer{}
producer.Initialize(configFile)

assert.NotNil(t, producer.client, "Redis client should be initialized")
}

func TestJSONProducer_Produce(t *testing.T) {
configFile := "config.json.example"

producer := &JSONProducer{
Ttl: time.Minute,
}
producer.Initialize(configFile)

ctx := context.Background()
key := "test_json_key"
// Create a test JSON object with nested structures
testJSON := `{
"id": "2210",
"user": {
"name": "Foogaro",
"year": 1978,
"email": "luigi@foogaro.com"
}
}`
producer.Produce(ctx, []byte(key), []byte(testJSON), nil)

// Verify the data in Redis
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Adjust as necessary
})
defer client.Close()

// Use JSON.GET to retrieve the stored JSON
result, err := client.Do(ctx, "JSON.GET", key, "$").Text()
assert.NoError(t, err, "Should not error when getting JSON from Redis")

// Compare the JSON strings (after normalizing them)
var expected, actual interface{}
err = json.Unmarshal([]byte(testJSON), &expected)
assert.NoError(t, err, "Should parse expected JSON")

err = json.Unmarshal([]byte(result), &actual)
assert.NoError(t, err, "Should parse actual JSON")

assert.Equal(t, expected, actual, "Stored JSON should match original")
}

func TestJSONProducer_Close(t *testing.T) {
configFile := "config.json.example"

producer := &JSONProducer{}
producer.Initialize(configFile)

err := producer.Close(context.Background())
assert.NoError(t, err, "Should not error when closing Redis connection")
}
Loading