33import os
44import apache_beam as beam
55from apache_beam .options .pipeline_options import PipelineOptions
6+ from apache_beam .transforms .window import FixedWindows
67import requests
7- from typing import Dict , Any
8+ from typing import Dict , Any , List
9+ from datetime import timedelta
810
911class PubSubToBetterStack (beam .DoFn ):
10- def __init__ (self , source_token : str , ingesting_host : str ):
12+ def __init__ (self , source_token : str , ingesting_host : str , batch_size : int ):
1113 self .source_token = source_token
1214 self .ingesting_url = ingesting_host if '://' in ingesting_host else f'https://{ ingesting_host } '
15+ self .batch_size = batch_size
1316 self .headers = {
1417 'Authorization' : f'Bearer { source_token } ' ,
1518 'Content-Type' : 'application/json'
1619 }
20+ self .batch = []
1721
1822 def process (self , element : bytes ) -> None :
1923 try :
2024 # Parse the Pub/Sub data
2125 data = json .loads (element .decode ('utf-8' ))
22-
26+
2327 # Rename timestamp key to dt to be understood by Better Stack
2428 if 'timestamp' in data :
2529 data ['dt' ] = data .pop ('timestamp' )
30+
31+ self .batch .append (data )
32+
33+ # If we've reached the batch size, send the batch
34+ if len (self .batch ) >= self .batch_size :
35+ self ._send_batch ()
36+
37+ except Exception as e :
38+ # Log the error but don't fail the pipeline
39+ print (f"Error processing message: { str (e )} " )
40+
41+ def finish_bundle (self ):
42+ # Send any remaining messages in the batch
43+ if self .batch :
44+ self ._send_batch ()
2645
27- # Send to Better Stack
46+ def _send_batch (self ):
47+ try :
48+ # Send batch to Better Stack
2849 response = requests .post (
2950 self .ingesting_url ,
3051 headers = self .headers ,
31- json = data
52+ json = self . batch
3253 )
3354
3455 if response .status_code != 202 :
3556 raise Exception (f"Failed to send to Better Stack: { response .text } " )
3657
58+ # Clear the batch after successful send
59+ self .batch = []
60+
3761 except Exception as e :
3862 # Log the error but don't fail the pipeline
39- print (f"Error processing message : { str (e )} " )
63+ print (f"Error sending batch to Better Stack : { str (e )} " )
4064
4165def run (argv = None ):
4266 parser = argparse .ArgumentParser ()
@@ -55,6 +79,18 @@ def run(argv=None):
5579 required = True ,
5680 help = 'The ingesting host of your telemetry source in Better Stack'
5781 )
82+ parser .add_argument (
83+ '--batch_size' ,
84+ default = 100 ,
85+ type = int ,
86+ help = 'Number of messages to batch before sending to Better Stack'
87+ )
88+ parser .add_argument (
89+ '--window_size' ,
90+ default = 10 ,
91+ type = int ,
92+ help = 'Window size in seconds for batching messages'
93+ )
5894 known_args , pipeline_args = parser .parse_known_args (argv )
5995
6096 pipeline_options = PipelineOptions (
@@ -68,13 +104,17 @@ def run(argv=None):
68104 | 'Read from Pub/Sub' >> beam .io .ReadFromPubSub (
69105 subscription = known_args .input_subscription
70106 )
107+ | 'Window into fixed windows' >> beam .WindowInto (
108+ FixedWindows (known_args .window_size )
109+ )
71110 | 'Send to Better Stack' >> beam .ParDo (
72111 PubSubToBetterStack (
73112 known_args .better_stack_source_token ,
74- known_args .better_stack_ingesting_host
113+ known_args .better_stack_ingesting_host ,
114+ known_args .batch_size
75115 )
76116 )
77117 )
78118
79119if __name__ == '__main__' :
80- run ()
120+ run ()
0 commit comments