Skip to content

Commit 2027d61

Browse files
authored
numa-aware EventLoopGroup (#220)
also, latest submodules
1 parent 6984349 commit 2027d61

File tree

11 files changed

+116
-9
lines changed

11 files changed

+116
-9
lines changed

awscrt/common.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""
2+
Cross-platform library for `awscrt`.
3+
"""
4+
import _awscrt
5+
from typing import List
6+
7+
8+
def get_cpu_group_count() -> int:
9+
"""
10+
Returns number of processor groups on the system.
11+
12+
Useful for working with non-uniform memory access (NUMA) nodes.
13+
"""
14+
return _awscrt.get_cpu_group_count()
15+
16+
17+
def get_cpu_count_for_group(group_idx: int) -> int:
18+
"""
19+
Returns number of processors in a given group.
20+
"""
21+
return _awscrt.get_cpu_count_for_group(group_idx)

awscrt/io.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,13 @@ class EventLoopGroup(NativeResource):
4646
need to do async work will ask the EventLoopGroup for an event-loop to use.
4747
4848
Args:
49-
num_threads (int): Number of event-loops to create.
50-
Pass 0 to create one for each processor on the machine.
49+
num_threads (Optional[int]): Maximum number of event-loops to create.
50+
If unspecified, one is created for each processor on the machine.
51+
52+
cpu_group (Optional[int]): Optional processor group to which all
53+
threads will be pinned. Useful for systems with non-uniform
54+
memory access (NUMA) nodes. If specified, the number of threads
55+
will be capped at the number of processors in the group.
5156
5257
Attributes:
5358
shutdown_event (threading.Event): Signals when EventLoopGroup's threads
@@ -57,16 +62,26 @@ class EventLoopGroup(NativeResource):
5762

5863
__slots__ = ('shutdown_event')
5964

60-
def __init__(self, num_threads=0):
65+
def __init__(self, num_threads=None, cpu_group=None):
6166
super().__init__()
6267

68+
if num_threads is None:
69+
# C uses 0 to indicate defaults
70+
num_threads = 0
71+
72+
if cpu_group is None:
73+
is_pinned = False
74+
cpu_group = 0
75+
else:
76+
is_pinned = True
77+
6378
shutdown_event = threading.Event()
6479

6580
def on_shutdown():
6681
shutdown_event.set()
6782

6883
self.shutdown_event = shutdown_event
69-
self._binding = _awscrt.event_loop_group_new(num_threads, on_shutdown)
84+
self._binding = _awscrt.event_loop_group_new(num_threads, is_pinned, cpu_group, on_shutdown)
7085

7186

7287
class HostResolverBase(NativeResource):

crt/aws-c-common

source/common.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
#include "common.h"
6+
7+
#include <aws/common/system_info.h>
8+
9+
PyObject *aws_py_get_cpu_group_count(PyObject *self, PyObject *args) {
10+
(void)self;
11+
(void)args;
12+
uint16_t count = aws_get_cpu_group_count();
13+
return PyLong_FromUnsignedLong(count);
14+
}
15+
16+
PyObject *aws_py_get_cpu_count_for_group(PyObject *self, PyObject *args) {
17+
(void)self;
18+
19+
uint16_t group_idx;
20+
if (!PyArg_ParseTuple(args, "H", &group_idx)) {
21+
return NULL;
22+
}
23+
24+
size_t count = aws_get_cpu_count_for_group(group_idx);
25+
return PyLong_FromSize_t(count);
26+
}

source/common.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#ifndef AWS_CRT_PYTHON_COMMON_H
2+
#define AWS_CRT_PYTHON_COMMON_H
3+
/**
4+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
* SPDX-License-Identifier: Apache-2.0.
6+
*/
7+
8+
/**
9+
* This file includes definitions for common aws-c-common functions.
10+
*/
11+
12+
#include "module.h"
13+
14+
PyObject *aws_py_get_cpu_group_count(PyObject *self, PyObject *args);
15+
PyObject *aws_py_get_cpu_count_for_group(PyObject *self, PyObject *args);
16+
17+
#endif /* AWS_CRT_PYTHON_COMMON_H */

source/io.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,10 @@ PyObject *aws_py_event_loop_group_new(PyObject *self, PyObject *args) {
131131
struct aws_allocator *allocator = aws_py_get_allocator();
132132

133133
uint16_t num_threads;
134+
int is_pinned;
135+
uint16_t cpu_group;
134136
PyObject *shutdown_complete_py;
135-
if (!PyArg_ParseTuple(args, "HO", &num_threads, &shutdown_complete_py)) {
137+
if (!PyArg_ParseTuple(args, "HpHO", &num_threads, &is_pinned, &cpu_group, &shutdown_complete_py)) {
136138
return NULL;
137139
}
138140

@@ -146,7 +148,12 @@ PyObject *aws_py_event_loop_group_new(PyObject *self, PyObject *args) {
146148
.shutdown_callback_user_data = binding,
147149
};
148150

149-
binding->native = aws_event_loop_group_new_default(allocator, num_threads, &shutdown_options);
151+
if (is_pinned) {
152+
binding->native =
153+
aws_event_loop_group_new_default_pinned_to_cpu_group(allocator, num_threads, cpu_group, &shutdown_options);
154+
} else {
155+
binding->native = aws_event_loop_group_new_default(allocator, num_threads, &shutdown_options);
156+
}
150157
if (binding->native == NULL) {
151158
PyErr_SetAwsLastError();
152159
goto elg_init_failed;

source/module.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "module.h"
66

77
#include "auth.h"
8+
#include "common.h"
89
#include "crypto.h"
910
#include "event_stream.h"
1011
#include "http.h"
@@ -477,6 +478,8 @@ static PyMethodDef s_module_methods[] = {
477478
AWS_PY_METHOD_DEF(get_error_name, METH_VARARGS),
478479
AWS_PY_METHOD_DEF(get_error_message, METH_VARARGS),
479480
AWS_PY_METHOD_DEF(get_corresponding_builtin_exception, METH_VARARGS),
481+
AWS_PY_METHOD_DEF(get_cpu_group_count, METH_VARARGS),
482+
AWS_PY_METHOD_DEF(get_cpu_count_for_group, METH_VARARGS),
480483

481484
/* IO */
482485
AWS_PY_METHOD_DEF(is_alpn_available, METH_NOARGS),

test/test_common.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0.
3+
4+
from test import NativeResourceTest
5+
from awscrt.common import *
6+
7+
8+
class TestSystemInfo(NativeResourceTest):
9+
def test_get_cpu_group_count(self):
10+
self.assertGreater(get_cpu_group_count(), 0)
11+
12+
def test_get_cpu_count_for_group(self):
13+
group_count = get_cpu_group_count()
14+
for group_i in range(group_count):
15+
self.assertGreater(get_cpu_count_for_group(group_i), 0)

0 commit comments

Comments
 (0)