Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
lib
lib
__pycache__/
*.pyc
*.pyo
315 changes: 215 additions & 100 deletions test_ratelimit.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,241 @@
#
# Test ratelimit fields
# Test ratelimit fields according to latest draft syntax
#
# Updated to use the new RateLimit-Policy and RateLimit header syntax:
# - RateLimit-Policy: "name";q=quota;w=window;pk=:partition_key:;qu="unit"
# - RateLimit: "name";r=remaining;t=time_to_reset;pk=:partition_key:
#
import http_sfv


def test_policy():
def test_ratelimit_policy():
# Test RateLimit-Policy header format according to latest draft
policies = [
"1000;w=3600,5000;w=86400",
"100;w=60",
"10;w=1, 50;w=60, 1000;w=3600, 5000;w=86400",
"10;w=1;burst=1000, 1000;w=3600",
'"hourly";q=1000;w=3600,"daily";q=5000;w=86400',
'"permin";q=100;w=60',
'"fast";q=10;w=1,"min";q=50;w=60,"hour";q=1000;w=3600,"day";q=5000;w=86400',
'"burst";q=10;w=1;burst=1000,"hourly";q=1000;w=3600',
]
for p in policies:
l = http_sfv.List()
l.parse(p.encode())
for i in l:
assert i.value
assert "w" in i.params
print("value: ", i.value, "params:", dict(i.params))


def test_all_in_one():
values = [
(
"limit=10, remaining=10, reset=4,"
"""policy=(10;w=1 50;w=60 1000;w=3600 5000;w=86400)"""
),
]
for v in values:
d = http_sfv.Dictionary()
d.parse(v.encode())
for k, v in d.items():
print("parameter:", k, "value:", v)
assert k
assert v
if hasattr(v, "__iter__"):
for i in v:
assert i.value
assert "w" in i.params
print("value: ", i.value, "params:", dict(i.params))
raise NotImplementedError


def find_quota_policy(policies, limit):
assert i.value # policy name
assert "q" in i.params # quota
assert "w" in i.params # window
print("policy: ", i.value, "params:", dict(i.params))


def test_ratelimit_fields():
# Test both RateLimit-Policy and RateLimit headers together
# According to latest draft syntax
policy_header = '"default";q=10;w=1,"hourly";q=50;w=60,"daily";q=1000;w=3600,"monthly";q=5000;w=86400'
ratelimit_header = '"default";r=10;t=4'

# Parse RateLimit-Policy header
policy = http_sfv.List()
policy.parse(policy_header.encode())

# Parse RateLimit header
ratelimit = http_sfv.List()
ratelimit.parse(ratelimit_header.encode())

# Verify policy structure
for policy_item in policy:
assert policy_item.value # policy name
assert "q" in policy_item.params # quota
assert "w" in policy_item.params # window
print("policy:", policy_item.value, "quota:", policy_item.params["q"], "window:", policy_item.params["w"])

# Verify ratelimit structure
for ratelimit_item in ratelimit:
assert ratelimit_item.value # policy name
assert "r" in ratelimit_item.params # remaining
assert "t" in ratelimit_item.params # time to reset
print("ratelimit:", ratelimit_item.value, "remaining:", ratelimit_item.params["r"], "reset:", ratelimit_item.params["t"])


def find_policy_by_name(policies, policy_name):
"""Find a policy by its name from a list of policies"""
for policy in policies:
if policy.value == limit:
if policy.value == policy_name:
return policy.params
return {}


def parse_fields(headers):
# 27 µs ± 499 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
limit = http_sfv.Item()
limit.parse(headers["limit"].encode())

policies = http_sfv.List()
policies.parse(headers["policy"].encode())
quota_policy = find_quota_policy(policies, limit.value)
return limit, policies, quota_policy


def parse_fields_token(headers):
# 24.2 µs ± 506 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
limit = http_sfv.Item()
limit.parse(headers["limit"].encode())

policies = http_sfv.Dictionary()
policies.parse(headers["policy"].encode())
quota_policy = policies[f"q{limit.value}"]
return limit, policies, quota_policy.params


def parse_fields_int(headers):
# 24.5 µs ± 297 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
limit = int(headers["limit"])
def parse_ratelimit_headers(headers):
"""Parse RateLimit-Policy and RateLimit headers according to latest draft"""
# Parse RateLimit-Policy header
policies = http_sfv.List()
policies.parse(headers["policy"].encode())
quota_policy = find_quota_policy(policies, limit)
return limit, policies, quota_policy


def test_get_policy():
# To retreive the policy, I need to iterate over the list.
policies.parse(headers["RateLimit-Policy"].encode())

# Parse RateLimit header
ratelimit = http_sfv.List()
ratelimit.parse(headers["RateLimit"].encode())

# Get the current policy name from the RateLimit header
current_policy_name = ratelimit[0].value if ratelimit else None

# Find the matching policy
policy_params = find_policy_by_name(policies, current_policy_name) if current_policy_name else {}

return policies, ratelimit, policy_params


def test_quota_units_example():
"""Test RateLimit headers with quota units according to latest draft"""
headers = dict(
**{"RateLimit-Policy": '"bandwidth";q=65535;qu="content-bytes";w=10;pk=:sdfjLJUOUH==:'},
**{"RateLimit": '"bandwidth";r=30000;t=5;pk=:sdfjLJUOUH==:'}
)

policies, ratelimit, policy_params = parse_ratelimit_headers(headers)

# Verify policy structure with quota units
policy = policies[0]
assert policy.value == "bandwidth"
assert policy.params["q"] == 65535
assert policy.params["qu"] == "content-bytes" # quota unit
assert policy.params["w"] == 10

# Verify ratelimit structure
current = ratelimit[0]
assert current.value == "bandwidth"
assert current.params["r"] == 30000 # remaining bytes
assert current.params["t"] == 5

print("policy:", policy.value, "quota:", policy.params["q"], "unit:", policy.params["qu"], "window:", policy.params["w"])
print("ratelimit:", current.value, "remaining:", current.params["r"], "reset:", current.params["t"])


def test_multiple_policies_example():
"""Test multiple policies as shown in draft examples"""
# Example from draft: RateLimit-Policy: "permin";q=50;w=60,"perhr";q=1000;w=3600
headers = dict(
**{"RateLimit-Policy": '"permin";q=50;w=60,"perhr";q=1000;w=3600'},
**{"RateLimit": '"permin";r=25;t=45'}
)

policies, ratelimit, policy_params = parse_ratelimit_headers(headers)

# Should have 2 policies
assert len(policies) == 2

# Verify first policy
assert policies[0].value == "permin"
assert policies[0].params["q"] == 50
assert policies[0].params["w"] == 60

# Verify second policy
assert policies[1].value == "perhr"
assert policies[1].params["q"] == 1000
assert policies[1].params["w"] == 3600

# Verify current ratelimit refers to permin policy
assert ratelimit[0].value == "permin"
assert ratelimit[0].params["r"] == 25
assert ratelimit[0].params["t"] == 45

print("Multiple policies test passed - found", len(policies), "policies")


def test_basic_policy_example():
"""Test basic policy example from draft"""
# Example from draft: RateLimit-Policy: "default";q=100;w=10
headers = dict(
**{"RateLimit-Policy": '"default";q=100;w=10'},
**{"RateLimit": '"default";r=50;t=30'}
)

policies, ratelimit, policy_params = parse_ratelimit_headers(headers)

# Verify policy
policy = policies[0]
assert policy.value == "default"
assert policy.params["q"] == 100
assert policy.params["w"] == 10

# Verify ratelimit
current = ratelimit[0]
assert current.value == "default"
assert current.params["r"] == 50
assert current.params["t"] == 30

print("Basic policy test passed")


def test_policy_lookup():
"""Test retrieving a specific policy from RateLimit-Policy and RateLimit headers"""
headers = dict(
policy=(
"""10;w=1,"""
"""50;w=60,"""
"""1000;w=3600;comment="foo", """
"""5000;w=86400"""
),
limit="""1000""",
**{"RateLimit-Policy": (
'"fast";q=10;w=1,'
'"permin";q=50;w=60,'
'"hourly";q=1000;w=3600;comment="primary", '
'"daily";q=5000;w=86400'
)},
**{"RateLimit": '"hourly";r=999;t=3540'}
)

limit, policies, quota_policy = parse_fields(headers)
assert limit.value == 1000
policies, ratelimit, policy_params = parse_ratelimit_headers(headers)

# Verify we found the hourly policy
assert ratelimit[0].value == "hourly"
assert ratelimit[0].params["r"] == 999 # remaining
assert ratelimit[0].params["t"] == 3540 # time to reset

# Print all policies for verification
for policy in policies:
print("parameter:", policy.value, "value:", policy.params["w"])
print("policy:", policy.value, "quota:", policy.params["q"], "window:", policy.params["w"])

# Verify we found the matching policy parameters
assert policy_params["q"] == 1000 # quota
assert policy_params["w"] == 3600 # window


def test_get_policy_tokenized():
# To retreive the policy, I need to iterate over the list.
def test_partition_key_example():
"""Test RateLimit headers with partition key according to latest draft"""
headers = dict(
policy=(
"""q10;w=1,"""
"""q50;w=60,"""
"""q1000;w=3600;comment="foo", """
"""q5000;w=86400"""
),
limit="""1000""",
**{"RateLimit-Policy": '"peruser";q=100;w=60;pk=:cHsdsRa894==:'},
**{"RateLimit": '"peruser";r=50;t=30;pk=:cHsdsRa894==:'}
)
limit = http_sfv.Item()
limit.parse(headers["limit"].encode())
assert limit.value == "q1000"

policies = http_sfv.List()
policies.parse(headers["policy"].encode())
for policy in policies:
print("parameter:", policy.value, "value:", policy.params["w"])

# I could create a dictionary out of the list.
policies_dict = {policy.value: policy.params for policy in policies}
print("policies:", policies_dict)
print("current policy:", policies_dict[limit.value])
raise NotImplementedError

policies, ratelimit, policy_params = parse_ratelimit_headers(headers)

# Verify policy structure
policy = policies[0]
assert policy.value == "peruser"
assert policy.params["q"] == 100
assert policy.params["w"] == 60
assert policy.params["pk"] == b'p{\x1d\xb1\x16\xbc\xf7' # decoded base64

# Verify ratelimit structure
current = ratelimit[0]
assert current.value == "peruser"
assert current.params["r"] == 50
assert current.params["t"] == 30
assert current.params["pk"] == b'p{\x1d\xb1\x16\xbc\xf7' # decoded base64

print("policy:", policy.value, "quota:", policy.params["q"], "window:", policy.params["w"])
print("ratelimit:", current.value, "remaining:", current.params["r"], "reset:", current.params["t"])


# Test all functions when script is run directly
if __name__ == "__main__":
test_functions = [
test_ratelimit_policy,
test_ratelimit_fields,
test_policy_lookup,
test_partition_key_example,
test_quota_units_example,
test_multiple_policies_example,
test_basic_policy_example,
]

for test_func in test_functions:
try:
test_func()
print(f"✓ {test_func.__name__} passed")
except Exception as e:
print(f"✗ {test_func.__name__} failed: {e}")
import traceback
traceback.print_exc()