Skip to content

Commit 645eea1

Browse files
committed
Add BitField type to encode/decode at protocol layer
1 parent c45e49f commit 645eea1

File tree

7 files changed

+50
-34
lines changed

7 files changed

+50
-34
lines changed

kafka/admin/acl_resource.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from kafka.vendor.enum34 import IntEnum
99

1010
from kafka.errors import IllegalArgumentError
11-
from kafka.util import from_32_bit_field
1211

1312

1413
class ResourceType(IntEnum):
@@ -251,5 +250,5 @@ def validate(self):
251250
)
252251

253252

254-
def valid_acl_operations(int_val):
255-
return set([ACLOperation(v) for v in from_32_bit_field(int_val) if v not in (0, 1, 2)])
253+
def valid_acl_operations(int_vals):
254+
return set([ACLOperation(v) for v in int_vals if v not in (0, 1, 2)])

kafka/protocol/admin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from kafka.vendor.enum34 import IntEnum
99

1010
from kafka.protocol.api import Request, Response
11-
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields
11+
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields, BitField
1212

1313

1414
class CreateTopicsResponse_v0(Response):
@@ -338,7 +338,7 @@ class DescribeGroupsResponse_v3(Response):
338338
('client_host', String('utf-8')),
339339
('member_metadata', Bytes),
340340
('member_assignment', Bytes))),
341-
('authorized_operations', Int32)))
341+
('authorized_operations', BitField)))
342342
)
343343

344344

kafka/protocol/metadata.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.protocol.api import Request, Response
4-
from kafka.protocol.types import Array, Boolean, Int16, Int32, Schema, String
4+
from kafka.protocol.types import Array, Boolean, Int16, Int32, Schema, String, BitField
55

66

77
class MetadataResponse_v0(Response):
@@ -189,8 +189,8 @@ class MetadataResponse_v8(Response):
189189
('replicas', Array(Int32)),
190190
('isr', Array(Int32)),
191191
('offline_replicas', Array(Int32)))),
192-
('authorized_operations', Int32))),
193-
('authorized_operations', Int32)
192+
('authorized_operations', BitField))),
193+
('authorized_operations', BitField)
194194
)
195195

196196

kafka/protocol/types.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,3 +363,34 @@ def decode(self, data):
363363
return None
364364
return [self.array_of.decode(data) for _ in range(length)]
365365

366+
367+
class BitField(AbstractType):
368+
@classmethod
369+
def decode(cls, data):
370+
return cls.from_32_bit_field(Int32.decode(data))
371+
372+
@classmethod
373+
def encode(cls, vals):
374+
# to_32_bit_field returns unsigned val, so we need to
375+
# encode >I to avoid crash if/when byte 31 is set
376+
# (note that decode as signed still works fine)
377+
return struct.Struct('>I').pack(cls.to_32_bit_field(vals))
378+
379+
@classmethod
380+
def to_32_bit_field(cls, vals):
381+
value = 0
382+
for b in vals:
383+
assert 0 <= b < 32
384+
value |= 1 << b
385+
return value
386+
387+
@classmethod
388+
def from_32_bit_field(cls, value):
389+
result = set()
390+
count = 0
391+
while value != 0:
392+
if (value & 1) != 0:
393+
result.add(count)
394+
count += 1
395+
value = (value & 0xFFFFFFFF) >> 1
396+
return result

kafka/util.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -139,20 +139,3 @@ def wrapper(self, *args, **kwargs):
139139
functools.update_wrapper(wrapper, func)
140140
return wrapper
141141

142-
143-
def to_32_bit_field(vals):
144-
value = 0
145-
for b in vals:
146-
assert 0 <= b < 32
147-
value |= 1 << b
148-
return value
149-
150-
def from_32_bit_field(value):
151-
result = set()
152-
count = 0
153-
while value != 0:
154-
if (value & 1) != 0:
155-
result.add(count)
156-
count += 1
157-
value = (value & 0xFFFFFFFF) >> 1
158-
return result

test/test_protocol.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
import io
33
import struct
44

5+
import pytest
6+
57
from kafka.protocol.api import RequestHeader
68
from kafka.protocol.fetch import FetchRequest, FetchResponse
79
from kafka.protocol.find_coordinator import FindCoordinatorRequest
810
from kafka.protocol.message import Message, MessageSet, PartialMessage
911
from kafka.protocol.metadata import MetadataRequest
10-
from kafka.protocol.types import Int16, Int32, Int64, String, UnsignedVarInt32, CompactString, CompactArray, CompactBytes
12+
from kafka.protocol.types import Int16, Int32, Int64, String, UnsignedVarInt32, CompactString, CompactArray, CompactBytes, BitField
1113

1214

1315
def test_create_message():
@@ -332,3 +334,11 @@ def test_compact_data_structs():
332334
assert CompactBytes.decode(io.BytesIO(b'\x01')) == b''
333335
enc = CompactBytes.encode(b'foo')
334336
assert CompactBytes.decode(io.BytesIO(enc)) == b'foo'
337+
338+
339+
@pytest.mark.parametrize(('test_set',), [
340+
(set([0, 1, 5, 10, 31]),),
341+
(set(range(32)),),
342+
])
343+
def test_bit_field(test_set):
344+
assert BitField.decode(io.BytesIO(BitField.encode(test_set))) == test_set

test/test_util.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import pytest
55

6-
from kafka.util import ensure_valid_topic_name, from_32_bit_field, to_32_bit_field
6+
from kafka.util import ensure_valid_topic_name
77

88
@pytest.mark.parametrize(('topic_name', 'expectation'), [
99
(0, pytest.raises(TypeError)),
@@ -23,10 +23,3 @@ def test_topic_name_validation(topic_name, expectation):
2323
with expectation:
2424
ensure_valid_topic_name(topic_name)
2525

26-
27-
@pytest.mark.parametrize(('test_set',), [
28-
(set([0, 1, 5, 10, 31]),),
29-
(set(range(32)),),
30-
])
31-
def test_32_bit_field(test_set):
32-
assert from_32_bit_field(to_32_bit_field(test_set)) == test_set

0 commit comments

Comments
 (0)