Skip to content

Commit 2b056c0

Browse files
AdvaitTahilyaniZwFinkritvikrao
authored
Liveviz (#298)
* Rudimentary steps * MVP * Polished code * Fixed Reduction * Clean up * More clean-up * register liveviz automatically * Poll Mode * Examples * README * Docs * Updated ccs_server * Saturating Sum Added * add endline --------- Co-authored-by: Zane Fink <finkzane@gmail.com> Co-authored-by: Ritvik Rao <rsrao2@illinois.edu>
1 parent 95acb5b commit 2b056c0

File tree

11 files changed

+504
-13
lines changed

11 files changed

+504
-13
lines changed

charm4py/charm.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,12 @@ def CcsIsRemoteRequest(self):
11101110
def CcsSendReply(self, message):
11111111
self.lib.CcsSendReply(message)
11121112

1113+
def CcsDelayReply(self):
1114+
return self.lib.CcsDelayReply()
1115+
1116+
def CcsSendDelayedReply(self, d, message):
1117+
self.lib.CcsSendDelayedReply(d, message)
1118+
11131119
def callHandler(self, handlername, data):
11141120
if handlername in self.ccs_methods:
11151121
self.ccs_methods[handlername](data)

charm4py/charmlib/ccharm.pxd

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,13 @@ cdef extern from "ccs-server.h":
105105
ChMessageInt_t len
106106

107107
cdef extern from "conv-ccs.h":
108+
ctypedef struct CcsDelayedReply:
109+
CcsImplHeader *hdr;
108110
void CcsRegisterHandlerExt(const char *ccs_handlername, void *fn);
109111
int CcsIsRemoteRequest();
110112
void CcsSendReply(int replyLen, const void *replyData);
111-
113+
void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
114+
CcsDelayedReply CcsDelayReply()
112115

113116
cdef extern from "spanningTree.h":
114117
void getPETopoTreeEdges(int pe, int rootPE, int *pes, int numpes, unsigned int bfactor,

charm4py/charmlib/charmlib_cython.pyx

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,8 @@ cdef void recvRemoteMessage(void *msg) noexcept:
350350
# turn char arrays into strings
351351

352352
handler_name = incomingMsgPtr.handler_name[:handler_length].decode('utf-8')
353-
data = incomingMsgPtr.data[:data_length].decode('utf-8')
354-
355-
charm.callHandler(handler_name, data)
353+
data_bytes = incomingMsgPtr.data[:data_length]
354+
charm.callHandler(handler_name, data_bytes)
356355

357356

358357
class CharmLib(object):
@@ -909,13 +908,22 @@ class CharmLib(object):
909908
def isRemoteRequest(self):
910909
return bool(CcsIsRemoteRequest())
911910

912-
def CcsSendReply(self, str message):
913-
cdef bytes message_bytes = message.encode("utf-8")
914-
cdef const char* replyData = message_bytes
915-
916-
cdef int replyLen = len(message_bytes)
911+
def CcsSendReply(self, bytes message):
912+
cdef const char* replyData = message
913+
cdef int replyLen = len(message)
917914
CcsSendReply(replyLen, <const void*>replyData)
918915

916+
def CcsDelayReply(self):
917+
cdef CcsDelayedReply* token = <CcsDelayedReply*>malloc(sizeof(CcsDelayedReply))
918+
token[0] = CcsDelayReply()
919+
return <uintptr_t>token
920+
921+
def CcsSendDelayedReply(self, uintptr_t p, bytes msg):
922+
cdef const char* replyData = msg
923+
cdef CcsDelayedReply* token = <CcsDelayedReply*>p
924+
CcsSendDelayedReply(token[0], len(msg), <const void*>replyData)
925+
free(token)
926+
919927
def hapiAddCudaCallback(self, stream, future):
920928
if not HAVE_CUDA_BUILD:
921929
raise Charm4PyError("HAPI usage not allowed: Charm++ was not built with CUDA support")

charm4py/liveviz.py

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
from .charm import charm, Chare, register
2+
from dataclasses import dataclass, field
3+
from collections import deque
4+
import struct
5+
from itertools import chain
6+
Reducer = charm.reducers
7+
8+
group = None
9+
10+
def viz_gather(contribs):
11+
return list(chain(*contribs))
12+
13+
def viz_gather_preprocess(data, contributor):
14+
return [data]
15+
16+
Reducer.addReducer(viz_gather, pre=viz_gather_preprocess)
17+
18+
@dataclass
19+
class Config:
20+
version: int = 1
21+
isColor: bool = True
22+
isPush: bool = True
23+
is3d: bool = False
24+
min: tuple = field(default_factory=lambda: (0.0, 0.0, 0.0))
25+
max: tuple = field(default_factory=lambda: (1.0, 1.0, 1.0))
26+
27+
def to_binary(self):
28+
# Format: int, int, int, int, [double, double, double, double, double, double]
29+
binary_data = struct.pack(">iiii",
30+
self.version,
31+
1 if self.isColor else 0,
32+
1 if self.isPush else 0,
33+
1 if self.is3d else 0)
34+
if self.is3d:
35+
binary_data += struct.pack(">dddddd",
36+
self.min[0], self.min[1], self.min[2],
37+
self.max[0], self.max[1], self.max[2])
38+
return binary_data
39+
40+
class Vector3d:
41+
def __init__(self, x=0.0, y=0.0, z=0.0):
42+
self.x = x
43+
self.y = y
44+
self.z = z
45+
46+
@classmethod
47+
def from_bytes(cls, data, offset=0):
48+
# Read 3 doubles from the data starting at offset
49+
x, y, z = struct.unpack_from(">ddd", data, offset)
50+
return cls(x, y, z), offset + 24 # 24 = 3 * 8 bytes (double)
51+
52+
class ImageRequest:
53+
def __init__(self, version, request_type, width, height,
54+
x=None, y=None, z=None, o=None, minZ=0.0, maxZ=0.0):
55+
self.version = version
56+
self.request_type = request_type
57+
self.width = width
58+
self.height = height
59+
self.x = x
60+
self.y = y
61+
self.z = z
62+
self.o = o
63+
self.minZ = minZ
64+
self.maxZ = maxZ
65+
66+
@classmethod
67+
def from_bytes(cls, data):
68+
if len(data) < 16: # At least 4 ints
69+
raise ValueError("Not enough data to decode ImageRequest")
70+
71+
version, request_type, width, height = struct.unpack_from(">iiii", data, 0)
72+
73+
# If there's more data, we have the optional fields
74+
if len(data) > 16:
75+
offset = 16
76+
x, offset = Vector3d.from_bytes(data, offset)
77+
y, offset = Vector3d.from_bytes(data, offset)
78+
z, offset = Vector3d.from_bytes(data, offset)
79+
o, offset = Vector3d.from_bytes(data, offset)
80+
minZ, maxZ = struct.unpack_from(">dd", data, offset)
81+
82+
return cls(version, request_type, width, height, x, y, z, o, minZ, maxZ)
83+
else:
84+
return cls(version, request_type, width, height)
85+
86+
@register
87+
class LiveVizGroup(Chare):
88+
89+
def __init__(self, cb, poll):
90+
self.callback = cb
91+
self.poll = poll
92+
charm.CcsRegisterHandler("lvImage", self.image_handler)
93+
if poll:
94+
self.requests = deque()
95+
self.images = deque()
96+
97+
def send(self, result):
98+
image = ByteImage.from_contributions(result, LiveViz.cfg.isColor)
99+
if self.poll:
100+
if len(self.requests) > 0:
101+
req, delayed = self.requests.popleft()
102+
output = ByteImage.with_image_in_corner(image, req.width, req.height)
103+
charm.CcsSendDelayedReply(delayed, output.to_binary())
104+
else:
105+
print("sent")
106+
self.images.append(image)
107+
else:
108+
output = ByteImage.with_image_in_corner(image, self.wid, self.ht)
109+
charm.CcsSendDelayedReply(self.reply, output.to_binary())
110+
111+
def image_handler(self, msg):
112+
request = ImageRequest.from_bytes(msg)
113+
if self.poll:
114+
if len(self.images) > 0:
115+
output = ByteImage.with_image_in_corner(self.images.popleft(), request.width, request.height)
116+
charm.CcsSendReply(output.to_binary())
117+
else:
118+
self.requests.append((request, charm.CcsDelayReply()))
119+
else:
120+
self.ht = request.height
121+
self.wid = request.width
122+
self.callback(request)
123+
self.reply = charm.CcsDelayReply()
124+
125+
class ByteImage:
126+
def __init__(self, data=None, width=0, height=0, is_color=True):
127+
"""
128+
Initialize a byte image
129+
130+
Args:
131+
data (bytes, optional): Raw image data as bytes, or None to create empty image
132+
width (int): Image width in pixels
133+
height (int): Image height in pixels
134+
is_color (bool): Whether the image is in color (True) or grayscale (False)
135+
"""
136+
self.width = width
137+
self.height = height
138+
self.is_color = is_color
139+
self.bytes_per_pixel = 3 if is_color else 1
140+
141+
if data is not None:
142+
self.data = data
143+
else:
144+
self.data = bytes(width * height * self.bytes_per_pixel)
145+
146+
@classmethod
147+
def from_contributions(cls, contribs, is_color=True):
148+
"""
149+
Create a ByteImage from multiple contributions, positioning each
150+
contribution at the right location.
151+
152+
Args:
153+
contribs (list): List of tuples with format
154+
(bytes_data, startx, starty, local_height, local_width, total_height, total_width)
155+
is_color (bool): Whether the image is in color
156+
157+
Returns:
158+
ByteImage: A composite image with all contributions in the right positions
159+
"""
160+
_, _, _, _, _, total_height, total_width = contribs[0]
161+
bytes_per_pixel = 3 if is_color else 1
162+
163+
buffer = bytearray(total_width * total_height * bytes_per_pixel)
164+
165+
for data, startx, starty, local_height, local_width, _, _ in contribs:
166+
for y in range(local_height):
167+
for x in range(local_width):
168+
src_pos = (y * local_width + x) * bytes_per_pixel
169+
dst_pos = ((starty + y) * total_width + (startx + x)) * bytes_per_pixel
170+
171+
if src_pos + bytes_per_pixel <= len(data):
172+
buffer[dst_pos:dst_pos + bytes_per_pixel] = (buffer[dst_pos:dst_pos + bytes_per_pixel] + data[src_pos:src_pos + bytes_per_pixel]) % 256
173+
174+
return cls(bytes(buffer), total_width, total_height, is_color)
175+
176+
def to_binary(self):
177+
return self.data
178+
179+
@classmethod
180+
def with_image_in_corner(cls, src_image, new_width, new_height):
181+
"""
182+
Create a new image with specified dimensions and place the source image
183+
in the top left corner.
184+
185+
Args:
186+
src_image (ByteImage): Source image to place in the corner
187+
new_width (int): Width of the new image
188+
new_height (int): Height of the new image
189+
190+
Returns:
191+
ByteImage: A new image with the source image in the top left corner
192+
"""
193+
dest_image = cls(None, new_width, new_height, src_image.is_color)
194+
bytes_per_pixel = dest_image.bytes_per_pixel
195+
196+
buffer = bytearray(new_width * new_height * bytes_per_pixel)
197+
198+
# Calculate dimensions to copy
199+
copy_width = min(new_width, src_image.width)
200+
copy_height = min(new_height, src_image.height)
201+
202+
for y in range(copy_height):
203+
for x in range(copy_width):
204+
src_pos = (y * src_image.width + x) * bytes_per_pixel
205+
206+
dst_pos = (y * new_width + x) * bytes_per_pixel
207+
208+
if src_pos + bytes_per_pixel <= len(src_image.data):
209+
buffer[dst_pos:dst_pos + bytes_per_pixel] = src_image.data[src_pos:src_pos + bytes_per_pixel]
210+
211+
return cls(bytes(buffer), new_width, new_height, src_image.is_color)
212+
213+
class LiveViz:
214+
cfg = None
215+
216+
@classmethod
217+
def config_handler(cls, msg):
218+
charm.CcsSendReply(cls.cfg.to_binary())
219+
220+
@classmethod
221+
def deposit(cls, buffer, elem, x, y, ht, wid, g_ht, g_wid):
222+
elem.reduce(group.send, data=(buffer,x,y,ht,wid,g_ht,g_wid), reducer=Reducer.viz_gather)
223+
224+
@classmethod
225+
def init(cls, cfg, cb, poll=False):
226+
global group
227+
cls.cfg = cfg
228+
grp = Chare(LiveVizGroup, args=[cb, poll], onPE=0)
229+
charm.thisProxy.updateGlobals({'group': grp}, awaitable=True, module_name='charm4py.liveviz').get()
230+
charm.CcsRegisterHandler("lvConfig", cls.config_handler)

docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ to the largest supercomputers.
4040
channels
4141
sections
4242
pool
43+
liveviz
4344
rules
4445
gpus
4546

0 commit comments

Comments
 (0)