Skip to content

Conversation

ckong316
Copy link
Contributor

Modifies structs to take in a string 'key' (in addition to the data []byte) so the commands (allocation/jobs/nodes) can write the IDs to the key for kafka absolute order partitioning.

should have no effect on other sinks, as the key argument is ignored in the Put().

tested locally.

@ckong316 ckong316 mentioned this pull request Nov 26, 2018
sink/kinesis.go Outdated

// Put ..
func (s *KinesisSink) Put(data []byte) error {
func (s *KinesisSink) Put(key string, data []byte) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sake of consistency, should we use key as the PartitionKey below (unless $SINK_KINESIS_PARTITION_KEY is set; see my note on #33)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

sink/stdout.go Outdated
func (s *StdoutSink) Put(data []byte) error {
fmt.Println(string(data))
func (s *StdoutSink) Put(key string, data []byte) error {
fmt.Printf("key: %s, message: %s\n", key, string(data))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could break anything that expects to consume the stdout sink as JSON-encoded log lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will remove, though it's helpful to print the key for testing purposes

sink/kafka.go Outdated
func (s *KafkaSink) Put(data []byte) error {
func (s *KafkaSink) Put(key string, data []byte) error {
s.putCh <- data
s.key = key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is correct; you're sending the data through the channel putCh but mutating the sink to set the key, which I think will be incorrect. I think putCh will need to be modified to take both (as a struct, maybe) data []byte and also key []string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, you might be right. but it is working currently, i'll see what @jippi or @josegonzalez say.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wyattanderson updated

Start() error
Stop()
Put(data []byte) error
Put(key string, data []byte) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency's sake, I'd maybe have both key and data be of type []byte.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the IDs in the nomad structs are all string types, so then I would have to convert them all to []bytes.

@ckong316
Copy link
Contributor Author

addressed some comments

@wyattanderson
Copy link
Contributor

LGTM

@rostow
Copy link

rostow commented Feb 27, 2023

Having this code merged would be a godsend. Are there any reasons not to merge this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants