-
Notifications
You must be signed in to change notification settings - Fork 149
Open
Description
I am trying to figure out a right way of streaming data using streamz with websocket. My streaming data is loaded using websocket but not streaming anything.
import config
import websocket, json
import pandas as pd
from streamz.dataframe import DataFrame
from streamz import Stream
import time
ticker_list = ["SCHG"]
# Create DataFrame with specified columns and ticker_list as index
df = pd.DataFrame(columns=['Bid', 'Ask', 'Time'], index=ticker_list)
source = Stream()
def update_dataframe(json_message, df):
ticker_data = json_message[0]
ticker_symbol = ticker_data['S']
df.loc[ticker_symbol, 'Bid'] = ticker_data['bp']
df.loc[ticker_symbol, 'Ask'] = ticker_data['ap']
df.loc[ticker_symbol, 'Time'] = pd.to_datetime(ticker_data['t'], unit='s').tz_localize('UTC').tz_convert('America/New_York').strftime('%Y-%m-%d %H:%M:%S%z')
source.emit(df.copy()) # Emit the updated DataFrame to the stream
def on_open(ws):
print("opened connection")
# Authentication
auth_data = {"action":"auth","key": config.API_KEY,"secret": config.SECRET_KEY}
# Subscribe
ws.send(json.dumps(auth_data))
listen_message = {"action":"subscribe","quotes":ticker_list}
ws.send(json.dumps(listen_message))
print('subscribed')
def on_message(ws, message):
json_message = json.loads(message)
# Update the DataFrame and emit it to the stream
update_dataframe(json_message, df)
def on_close(ws):
print("closed connection")
def print_result(x):
print(x)
sink = source.map(print_result) # Set up a sink to consume and process the emitted data
socket = "wss://stream.data.alpaca.markets/v2/iex"
ws = websocket.WebSocketApp(socket, on_open=on_open, on_message=on_message, on_close=on_close)
ws.run_forever()
Metadata
Metadata
Assignees
Labels
No labels