-
Notifications
You must be signed in to change notification settings - Fork 29
Write kafka key to partition #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
sink/kinesis.go
Outdated
|
||
// Put .. | ||
func (s *KinesisSink) Put(data []byte) error { | ||
func (s *KinesisSink) Put(key string, data []byte) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the sake of consistency, should we use key
as the PartitionKey
below (unless $SINK_KINESIS_PARTITION_KEY
is set; see my note on #33)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
sink/stdout.go
Outdated
func (s *StdoutSink) Put(data []byte) error { | ||
fmt.Println(string(data)) | ||
func (s *StdoutSink) Put(key string, data []byte) error { | ||
fmt.Printf("key: %s, message: %s\n", key, string(data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could break anything that expects to consume the stdout sink as JSON-encoded log lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok will remove, though it's helpful to print the key for testing purposes
sink/kafka.go
Outdated
func (s *KafkaSink) Put(data []byte) error { | ||
func (s *KafkaSink) Put(key string, data []byte) error { | ||
s.putCh <- data | ||
s.key = key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is correct; you're sending the data through the channel putCh
but mutating the sink to set the key, which I think will be incorrect. I think putCh
will need to be modified to take both (as a struct, maybe) data []byte
and also key []string
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, you might be right. but it is working currently, i'll see what @jippi or @josegonzalez say.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wyattanderson updated
Start() error | ||
Stop() | ||
Put(data []byte) error | ||
Put(key string, data []byte) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency's sake, I'd maybe have both key
and data
be of type []byte
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the IDs in the nomad structs are all string types, so then I would have to convert them all to []bytes.
addressed some comments |
LGTM |
Having this code merged would be a godsend. Are there any reasons not to merge this PR? |
Modifies structs to take in a string 'key' (in addition to the data []byte) so the commands (allocation/jobs/nodes) can write the IDs to the key for kafka absolute order partitioning.
should have no effect on other sinks, as the key argument is ignored in the Put().
tested locally.