Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9260d09
Added flattrade dummy websocket
mohamedmuzamil008 Jul 17, 2025
84169bd
Added Kafka producer and consumer
mohamedmuzamil008 Jul 17, 2025
07e0472
Added Kafka in the base and flattrade adapter
mohamedmuzamil008 Jul 17, 2025
fdc2a8f
Changed the openalgo API key
mohamedmuzamil008 Jul 17, 2025
079c944
Added few logs
mohamedmuzamil008 Jul 17, 2025
25c2a7b
Added timescaledb setup and configuration
mohamedmuzamil008 Jul 17, 2025
a4dfa5e
Deleted sample env file
mohamedmuzamil008 Jul 17, 2025
09e9878
created sample env
mohamedmuzamil008 Jul 17, 2025
b0bcda6
updated sample env
mohamedmuzamil008 Jul 17, 2025
8d97350
Added redpanda and timescaledb config and setup
mohamedmuzamil008 Jul 17, 2025
6cfc6da
Timescaledb upload fully working
mohamedmuzamil008 Jul 18, 2025
13dd434
Merge pull request #2 from mohamedmuzamil008/timescaledbconfiguration
mohamedmuzamil008 Jul 18, 2025
5008d15
Complete working backtesting modules
mohamedmuzamil008 Jul 20, 2025
8624a59
minor changes and removing unwanted codes
mohamedmuzamil008 Jul 20, 2025
8a6dcac
Working backtesting module - Included minor changes
mohamedmuzamil008 Jul 21, 2025
80ef507
Fully working backtesting module completed
mohamedmuzamil008 Jul 24, 2025
a2af8e3
backtesting module optimized for performance
mohamedmuzamil008 Jul 24, 2025
cb0581a
Removed unwanted codes, added strategy 9
mohamedmuzamil008 Jul 25, 2025
c1ac2e6
Increased the pool_size, max_overflow value and timeout in auth db
mohamedmuzamil008 Jul 30, 2025
1015168
Nifty added and strategy minor changes
mohamedmuzamil008 Aug 1, 2025
3a39945
Completely backtested and working strategy
mohamedmuzamil008 Aug 3, 2025
73368d5
Completely backtested code - ALL STRATEGIES WORKING EXCEPT 12
mohamedmuzamil008 Aug 7, 2025
d0108fe
Fully backtested enhanced code - Added 15m trend identification
mohamedmuzamil008 Aug 8, 2025
6f2f211
Minor changes
mohamedmuzamil008 Aug 21, 2025
d6d2c57
Merge pull request #3 from mohamedmuzamil008/Historical-data-fetch
mohamedmuzamil008 Aug 21, 2025
d749e6c
Merge branch 'main' into main
mohamedmuzamil008 Aug 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ checkpoints.json
# CSV Files (except symbols.csv)
*.csv
!symbols.csv

*.png
*.txt
*.py
=======
!all_symbols.csv


# Environments
.env
.venv
Expand Down
1 change: 0 additions & 1 deletion .sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,3 @@ CSRF_TIME_LIMIT = ''
# Examples: 'instance1_session', 'user1_session', 'app_session', etc.
SESSION_COOKIE_NAME = 'session'
CSRF_COOKIE_NAME = 'csrf_token'

Binary file added Live Trading.docx
Binary file not shown.
3 changes: 2 additions & 1 deletion blueprints/apikey.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def manage_api_key():

# Generate new API key
api_key = generate_api_key()

logger.info(f"Generated API key for user: {api_key}")

# Store the API key (auth_db will handle both hashing and encryption)
key_id = upsert_api_key(user_id, api_key)

Expand Down
1 change: 1 addition & 0 deletions blueprints/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def setup():

# Automatically generate and save API key
api_key = generate_api_key()
logger.info(f"Generated API key for user: {api_key}")
key_id = upsert_api_key(username, api_key)
if not key_id:
logger.error(f"Failed to create API key for user {username}")
Expand Down
2 changes: 1 addition & 1 deletion broker/flattrade/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get_api_response(endpoint, auth, method="POST", payload=None):
data = response.text

# Print raw response for debugging
logger.info(f"Raw Response: {data}")
# logger.info(f"Raw Response: {data}")

try:
return json.loads(data)
Expand Down
40 changes: 39 additions & 1 deletion broker/flattrade/streaming/flattrade_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ def normalize(data: Dict[str, Any], msg_type: str) -> Dict[str, Any]:
return result


# RedPanda/Kafka imports
try:
from kafka import KafkaProducer
from kafka.errors import KafkaError
KAFKA_AVAILABLE = True
except ImportError:
KAFKA_AVAILABLE = False

class FlattradeWebSocketAdapter(BaseBrokerWebSocketAdapter):
"""Flattrade WebSocket adapter with improved structure and error handling"""

Expand Down Expand Up @@ -256,8 +264,17 @@ def _setup_connection_management(self):
"""Initialize connection management"""
self.running = False
self.connected = False

# Kafka/RedPanda specific attributes
self.kafka_enabled = self.redpanda_enabled and KAFKA_AVAILABLE
self.kafka_publish_lock = threading.Lock()
self.lock = threading.Lock()
self.reconnect_attempts = 0

if self.kafka_enabled:
self.logger.info("Kafka publishing enabled for Flattrade adapter")
else:
self.logger.info("Kafka publishing disabled - using ZMQ only")

def _setup_normalizers(self):
"""Initialize data normalizers"""
Expand Down Expand Up @@ -567,7 +584,7 @@ def _resubscribe_all(self):
elif mode == Config.MODE_DEPTH:
depth_scrips.add(scrip)
self.ws_subscription_refs[scrip]['depth_count'] += 1

# Resubscribe in batches
if touchline_scrips:
scrip_list = '#'.join(touchline_scrips)
Expand Down Expand Up @@ -603,6 +620,27 @@ def _on_message(self, ws, message):
self.logger.error(f"JSON decode error: {e}, message: {message}")
except Exception as e:
self.logger.error(f"Message processing error: {e}", exc_info=True)

def publish_market_data(self, topic: str, data: dict) -> None:
# --- 1) ZMQ Publish (existing) ---
if self.zmq_enabled:
self.logger.info(f"[ZMQ PUBLISH] Topic: {topic} | Data: {data}")
self.logger.debug(f"[DEBUG] ZMQ publish call for topic: {topic}, data keys: {list(data.keys())}")
super().publish_market_data(topic, data)

# --- 2) Kafka Publish (new) ---
if self.kafka_enabled and self.kafka_producer:
try:
with self.kafka_publish_lock:
# The KafkaProducer was set up in BaseBrokerWebSocketAdapter._setup_redpanda()
# We assume the topic already exists or auto‐creation is enabled.
# You can also prefix or map your ZMQ topic to a Kafka topic namespace here.
self.kafka_producer.send("tick_data", key=topic, value=data)
# Optionally flush immediately (costly):
# self.kafka_producer.flush()
self.logger.info(f"[KAFKA PUBLISH] Topic: {"tick_data"} | Key:{topic} | Data: {data}")
except KafkaError as e:
self.logger.error(f"[KAFKA ERROR] Failed to publish to Kafka topic {"tick_data"}:{topic}: {e}")

def _process_market_message(self, data: Dict[str, Any]) -> None:
"""Process market data messages with better error handling"""
Expand Down
202 changes: 202 additions & 0 deletions broker/flattrade/streaming/flattrade_dummy_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import asyncio
import websockets
import json
import random
from datetime import datetime

# Mock database
valid_tokens = {
"valid_token_123": {"user_id": "FZ15709", "client_id": "74eb594de4a944558aeacd623a714d16"}
}

# Initialize market data variables
last_price = 1300.0
volume = 300
open_price = 1296.0
high = 1302.0
low = 1294.0
close = 1299.5

last_price_2 = 425.0
volume_2 = 1000
open_price_2 = 422.0
high_2 = 430.0
low_2 = 420.0
close_2 = 426.0

async def handle_connection(websocket):
print("Client connected")
authenticated = False
subscribed = False

try:
# Create a task for sending market data
market_data_task = None

async for message in websocket:
try:
data = json.loads(message)
print(f"Received: {data}")

# Authentication handling
if data.get("t") == "c" and not authenticated:
response = {
"t": "ck",
"s": "OK",
"uid": "FZ15709"
}
authenticated = True
await websocket.send(json.dumps(response))
print("Authentication successful")

# Subscription handling
elif data.get("t") == "t" and authenticated and not subscribed:
response = {
"t": "tk",
"e": "NSE",
"tk": "2885",
#"ts": "RELIANCE-EQ",
"ti": "1",
"ls": "1",
"lp": str(last_price),
"pc": "0.5",
"v": str(volume),
"o": str(open_price),
"h": str(high),
"l": str(low),
"c": str(close),
"ap": str(last_price)
}
response_2 = {
"t": "tk",
"e": "NSE",
"tk": "11536",
"ts": "TCS-EQ",
"ti": "1",
"ls": "1",
"lp": str(last_price_2),
"pc": "0.5",
"v": str(volume_2),
"o": str(open_price_2),
"h": str(high_2),
"l": str(low_2),
"c": str(close_2),
"ap": str(last_price_2)
}
subscribed = True
await websocket.send(json.dumps(response))
await websocket.send(json.dumps(response_2))
print("Subscribed")

# Start sending market data after subscription
market_data_task = asyncio.create_task(send_market_data(websocket))

elif data.get("t") == "u" and subscribed:
if market_data_task:
market_data_task.cancel()
try:
await market_data_task
except asyncio.CancelledError:
pass

response = {
"t": "uk",
"k": "NSE|2885#NSE|11536"
}
subscribed = False
await websocket.send(json.dumps(response))
print("Unsubscribed")


# Unknown message type
else:
if not authenticated:
response = {
"t": "error",
"emsg": "Not authenticated"
}
await websocket.send(json.dumps(response))
print("Not Authenticated")

except json.JSONDecodeError:
response = {
"t": "error",
"emsg": "Invalid JSON"
}
await websocket.send(json.dumps(response))

except websockets.exceptions.ConnectionClosed:
print("Client disconnected")
if market_data_task:
market_data_task.cancel()

async def send_market_data(websocket):
"""Continuously send market data updates"""
global last_price, volume, open_price, high, low, close
global last_price_2, volume_2, open_price_2, high_2, low_2, close_2

print("Starting market data stream")
while True:
try:
# Update market data
last_price += round(random.uniform(-2, 2), 2)
volume += random.randint(100, 1000)
open_price += round(random.uniform(-2, 2), 2)
high += round(random.uniform(-2, 2), 2)
low += round(random.uniform(-2, 2), 2)
close += round(random.uniform(-2, 2), 2)

last_price_2 += round(random.uniform(-2, 2), 2)
volume_2 += random.randint(100, 1000)
open_price_2 += round(random.uniform(-2, 2), 2)
high_2 += round(random.uniform(-2, 2), 2)
low_2 += round(random.uniform(-2, 2), 2)
close_2 += round(random.uniform(-2, 2), 2)

touchline_data = {
"t": "tf",
"e": "NSE",
"tk": "2885",
"lp": str(last_price),
"pc": "0.5",
"v": str(volume),
"o": str(open_price),
"h": str(high),
"l": str(low),
"c": str(close),
"ap": str(last_price)
}

touchline_data_2 = {
"t": "tf",
"e": "NSE",
"tk": "11536",
"lp": str(last_price_2),
"pc": "0.5",
"v": str(volume_2),
"o": str(open_price_2),
"h": str(high_2),
"l": str(low_2),
"c": str(close_2),
"ap": str(last_price_2)
}

await websocket.send(json.dumps(touchline_data))
await websocket.send(json.dumps(touchline_data_2))
print("Sent market data update")
await asyncio.sleep(2)

except websockets.exceptions.ConnectionClosed:
print("Connection closed while sending market data")
break
except Exception as e:
print(f"Error in market data stream: {e}")
break

async def main():
async with websockets.serve(handle_connection, "localhost", 8766):
print("Dummy WebSocket server running on ws://localhost:8766")
await asyncio.Future() # Run forever

if __name__ == "__main__":
asyncio.run(main())
2 changes: 2 additions & 0 deletions broker/flattrade/streaming/flattrade_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class FlattradeWebSocket:

# Connection constants
WS_URL = "wss://piconnect.flattrade.in/PiConnectWSTp/"
#WS_URL = "ws://localhost:8766"

CONNECTION_TIMEOUT = 15
THREAD_JOIN_TIMEOUT = 5

Expand Down
17 changes: 17 additions & 0 deletions broker/flattrade/streaming/kafkaconsum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
#'tick_raw',
'tick_data',
bootstrap_servers='localhost:9092',
group_id='test-group',
auto_offset_reset='earliest',
key_deserializer=lambda k: k.decode('utf-8') if k else None,
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

print("Listening for messages...")

for msg in consumer:
print(f"{msg.key} => {msg.value}")
20 changes: 20 additions & 0 deletions broker/flattrade/streaming/kafkaprod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from kafka import KafkaProducer
import json

producer = KafkaProducer(
bootstrap_servers='localhost:9092',
key_serializer=lambda k: k.encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Sample tick data
tick_data = {
"symbol": "RELIANCE",
"ltp": 2810.50,
"volume": 100,
"timestamp": 1720458436000
}

producer.send('tick_raw', key='RELIANCE', value=tick_data)
producer.flush()
print("Tick sent!")
Loading