Skip to content

Streamz with websocket not steaming any data #474

@leahpence

Description

@leahpence

I am trying to figure out a right way of streaming data using streamz with websocket. My streaming data is loaded using websocket but not streaming anything.

import config
import websocket, json
import pandas as pd
from streamz.dataframe import DataFrame
from streamz import Stream
import time

ticker_list = ["SCHG"]

# Create DataFrame with specified columns and ticker_list as index
df = pd.DataFrame(columns=['Bid', 'Ask', 'Time'], index=ticker_list)

source = Stream()

def update_dataframe(json_message, df):
    ticker_data = json_message[0]
    ticker_symbol = ticker_data['S']
    df.loc[ticker_symbol, 'Bid'] = ticker_data['bp']
    df.loc[ticker_symbol, 'Ask'] = ticker_data['ap']
    df.loc[ticker_symbol, 'Time'] = pd.to_datetime(ticker_data['t'], unit='s').tz_localize('UTC').tz_convert('America/New_York').strftime('%Y-%m-%d %H:%M:%S%z')

    source.emit(df.copy())  # Emit the updated DataFrame to the stream

def on_open(ws):
    print("opened connection")
    # Authentication
    auth_data = {"action":"auth","key": config.API_KEY,"secret": config.SECRET_KEY}

    # Subscribe
    ws.send(json.dumps(auth_data))
    
    listen_message = {"action":"subscribe","quotes":ticker_list}
    ws.send(json.dumps(listen_message))
    print('subscribed')       

def on_message(ws, message):
    json_message = json.loads(message)
    
    # Update the DataFrame and emit it to the stream
    update_dataframe(json_message, df)

def on_close(ws):
    print("closed connection")

def print_result(x):
    print(x)

sink = source.map(print_result)  # Set up a sink to consume and process the emitted data

socket = "wss://stream.data.alpaca.markets/v2/iex"
ws = websocket.WebSocketApp(socket, on_open=on_open, on_message=on_message, on_close=on_close)
ws.run_forever()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions