1- """IP blocking and unblocking functionality with cross-platform support"""
1+ """IP blocking and unblocking functionality with IPv4 + IPv6 + CIDR + cross-platform support"""
22
33import subprocess
44import threading
55import platform
6+ import tempfile
7+ import shutil
8+ import ipaddress
69from datetime import datetime , timedelta
10+ from pathlib import Path
711from typing import Dict , Set , List
812from colorama import Fore , Style
913from utils .logger import get_logger
1014from utils .system import get_platform_firewall_command
11- import ipaddress
1215
1316
1417class IPBlocker :
1518 """Handles IP blocking operations across different platforms"""
16-
19+
1720 def __init__ (self , block_duration : int , whitelist : Set [str ]):
1821 self .block_duration = block_duration
1922 self .whitelist = whitelist
@@ -23,35 +26,35 @@ def __init__(self, block_duration: int, whitelist: Set[str]):
2326 self .platform = platform .system ().lower ()
2427 self .firewall_cmd = get_platform_firewall_command ()
2528
26- # Added helper for IP validation
27- def _validate_ip (self , ip : str ) -> str :
28- """
29- Validate and normalize an IP or CIDR (IPv4 or IPv6).
30- Raises ValueError if invalid.
31- """
32- try :
33- network = ipaddress .ip_network (ip , strict = False )
34- return str (network )
35- except ValueError as e :
36- self .logger .error (f"Invalid IP/network '{ ip } ': { e } " )
37- raise ValueError (f"Invalid IP/network: { ip } " )
29+ # macOS setup for pfctl
30+ if self .platform == "darwin" :
31+ self ._init_macos_firewall ()
3832
39- def block_ip (self , ip : str , reason : str ) -> bool :
40- """Block an IP address using the appropriate firewall system """
33+ def _is_valid_ip (self , ip : str ) -> bool :
34+ """Validate IPv4, IPv6, and CIDR ranges. """
4135 try :
42- ip = self ._validate_ip (ip ) # Validate IP first
36+ ipaddress .ip_network (ip , strict = False )
37+ return True
4338 except ValueError :
4439 return False
4540
41+ def _is_whitelisted (self , ip : str ) -> bool :
42+ return ip in self .whitelist
43+
44+ def block_ip (self , ip : str , reason : str ) -> bool :
45+ """Block an IP or CIDR range."""
46+ if not self ._is_valid_ip (ip ):
47+ self .logger .warning (f"Invalid IP/CIDR rejected: { ip } " )
48+ return False
49+
4650 if self ._is_whitelisted (ip ):
4751 self .logger .info (f"IP { ip } is whitelisted, not blocking" )
4852 return False
49-
53+
5054 with self .lock :
5155 if ip not in self .blocked_ips :
5256 try :
5357 success = self ._execute_block_command (ip )
54-
5558 if success :
5659 self .blocked_ips [ip ] = datetime .now ()
5760 self .logger .warning (f"🚫 BLOCKED IP: { ip } - Reason: { reason } " )
@@ -60,26 +63,23 @@ def block_ip(self, ip: str, reason: str) -> bool:
6063 else :
6164 self .logger .error (f"Failed to block IP { ip } " )
6265 return False
63-
6466 except Exception as e :
6567 self .logger .error (f"Exception while blocking IP { ip } : { e } " )
6668 return False
6769 else :
6870 self .logger .debug (f"IP { ip } already blocked" )
6971 return True
70-
72+
7173 def unblock_ip (self , ip : str ) -> bool :
72- """Manually unblock a specific IP address"""
73- try :
74- ip = self ._validate_ip (ip ) # Validate IP before unblocking
75- except ValueError :
74+ """Unblock IPv4/IPv6 address or CIDR."""
75+ if not self ._is_valid_ip (ip ):
76+ self .logger .warning (f"Invalid IP/CIDR for unblock: { ip } " )
7677 return False
7778
7879 with self .lock :
7980 if ip in self .blocked_ips :
8081 try :
8182 success = self ._execute_unblock_command (ip )
82-
8383 if success :
8484 del self .blocked_ips [ip ]
8585 self .logger .info (f"✅ UNBLOCKED IP: { ip } " )
@@ -88,7 +88,6 @@ def unblock_ip(self, ip: str) -> bool:
8888 else :
8989 self .logger .error (f"Failed to unblock IP { ip } " )
9090 return False
91-
9291 except Exception as e :
9392 self .logger .error (f"Exception while unblocking IP { ip } : { e } " )
9493 return False
@@ -97,149 +96,160 @@ def unblock_ip(self, ip: str) -> bool:
9796 return False
9897
9998 def unblock_expired_ips (self ) -> List [str ]:
100- """Unblock IPs that have exceeded the block duration"""
10199 current_time = datetime .now ()
102100 block_duration = timedelta (seconds = self .block_duration )
103101 unblocked_ips = []
104-
102+
105103 with self .lock :
106- expired_ips = [
107- ip for ip , block_time in self .blocked_ips .items ()
108- if current_time - block_time > block_duration
104+ expired = [
105+ ip for ip , t in self .blocked_ips .items ()
106+ if current_time - t > block_duration
109107 ]
110-
111- for ip in expired_ips :
108+
109+ for ip in expired :
112110 if self .unblock_ip (ip ):
113111 unblocked_ips .append (ip )
114-
112+
115113 return unblocked_ips
116-
117- def _is_whitelisted (self , ip : str ) -> bool :
118- """Check if IP is in whitelist"""
119- return ip in self .whitelist
120-
114+
115+
121116 def _execute_block_command (self , ip : str ) -> bool :
122- """Execute platform-specific block command"""
123117 try :
124- if self .platform == ' linux' :
118+ if self .platform == " linux" :
125119 return self ._block_ip_linux (ip )
126- elif self .platform == ' darwin' : # macOS
120+ elif self .platform == " darwin" :
127121 return self ._block_ip_macos (ip )
128- elif self .platform == ' windows' :
122+ elif self .platform == " windows" :
129123 return self ._block_ip_windows (ip )
130124 else :
131- self .logger .error (f"Blocking not implemented for platform : { self .platform } " )
125+ self .logger .error (f"Blocking not implemented for: { self .platform } " )
132126 return False
133127 except Exception as e :
134128 self .logger .error (f"Platform-specific blocking failed: { e } " )
135129 return False
136-
130+
137131 def _execute_unblock_command (self , ip : str ) -> bool :
138- """Execute platform-specific unblock command"""
139132 try :
140- if self .platform == ' linux' :
133+ if self .platform == " linux" :
141134 return self ._unblock_ip_linux (ip )
142- elif self .platform == ' darwin' : # macOS
135+ elif self .platform == " darwin" :
143136 return self ._unblock_ip_macos (ip )
144- elif self .platform == ' windows' :
137+ elif self .platform == " windows" :
145138 return self ._unblock_ip_windows (ip )
146139 else :
147- self .logger .error (f"Unblocking not implemented for platform : { self .platform } " )
140+ self .logger .error (f"Unblocking not implemented for: { self .platform } " )
148141 return False
149142 except Exception as e :
150143 self .logger .error (f"Platform-specific unblocking failed: { e } " )
151144 return False
152-
145+
146+
153147 def _block_ip_linux (self , ip : str ) -> bool :
154- """Block IP using iptables on Linux"""
155- cmd = ['sudo' , 'iptables' , '-A' , 'INPUT' , '-s' , ip , '-j' , 'DROP' ]
156- result = subprocess .run (cmd , check = True , capture_output = True , text = True )
148+ network = ipaddress .ip_network (ip , strict = False )
149+
150+ if network .version == 4 :
151+ cmd = ["sudo" , "iptables" , "-A" , "INPUT" , "-s" , ip , "-j" , "DROP" ]
152+ else :
153+ cmd = ["sudo" , "ip6tables" , "-A" , "INPUT" , "-s" , ip , "-j" , "DROP" ]
154+
155+ result = subprocess .run (cmd , capture_output = True , text = True )
157156 return result .returncode == 0
158-
157+
159158 def _unblock_ip_linux (self , ip : str ) -> bool :
160- """Unblock IP using iptables on Linux"""
161- cmd = ['sudo' , 'iptables' , '-D' , 'INPUT' , '-s' , ip , '-j' , 'DROP' ]
159+ network = ipaddress .ip_network (ip , strict = False )
160+
161+ if network .version == 4 :
162+ cmd = ["sudo" , "iptables" , "-D" , "INPUT" , "-s" , ip , "-j" , "DROP" ]
163+ else :
164+ cmd = ["sudo" , "ip6tables" , "-D" , "INPUT" , "-s" , ip , "-j" , "DROP" ]
165+
162166 result = subprocess .run (cmd , capture_output = True , text = True )
163167 return result .returncode == 0
164-
168+
169+
170+ def _init_macos_firewall (self ):
171+ """Initialize pfctl and create rules."""
172+ try :
173+ pfctl_path = shutil .which ("pfctl" )
174+ if not pfctl_path :
175+ self .logger .warning ("pfctl not found on macOS." )
176+ return
177+
178+ # Create/verify table
179+ subprocess .run (["sudo" , "pfctl" , "-t" , "blocked_ips" , "-T" , "show" ],
180+ capture_output = True )
181+
182+ self ._reload_macos_rules ()
183+ self .logger .info ("macOS firewall initialized." )
184+ except Exception as e :
185+ self .logger .error (f"Failed to init macOS firewall: { e } " )
186+
187+ def _reload_macos_rules (self ):
188+ """Write pfctl rule file and reload firewall"""
189+ try :
190+ pf_conf = """
191+ table <blocked_ips> persist
192+ block drop in quick from <blocked_ips> to any
193+ block drop out quick from any to <blocked_ips>
194+ pass in all
195+ pass out all
196+ """
197+ temp_path = Path (tempfile .gettempdir ()) / "simple_fw.conf"
198+ temp_path .write_text (pf_conf )
199+
200+ subprocess .run (["sudo" , "pfctl" , "-f" , str (temp_path )],
201+ capture_output = True )
202+ except Exception as e :
203+ self .logger .error (f"Failed to reload pfctl rules: { e } " )
204+
165205 def _block_ip_macos (self , ip : str ) -> bool :
166- """Block IP using pfctl on macOS"""
167- cmd1 = ['sudo' , 'pfctl' , '-t' , 'blocked_ips' , '-T' , 'add' , ip ]
168- result1 = subprocess .run (cmd1 , capture_output = True , text = True )
169- cmd2 = ['sudo' , 'pfctl' , '-e' ]
170- subprocess .run (cmd2 , capture_output = True , text = True )
171- return result1 .returncode == 0
172-
206+ cmd = ["sudo" , "pfctl" , "-t" , "blocked_ips" , "-T" , "add" , ip ]
207+ res = subprocess .run (cmd , capture_output = True , text = True )
208+ return res .returncode == 0 or "already" in res .stderr .lower ()
209+
173210 def _unblock_ip_macos (self , ip : str ) -> bool :
174- """Unblock IP using pfctl on macOS"""
175- cmd = [ 'sudo' , 'pfctl' , '-t' , 'blocked_ips' , '-T' , 'delete' , ip ]
176- result = subprocess . run ( cmd , capture_output = True , text = True )
177- return result . returncode == 0
178-
211+ cmd = [ "sudo" , " pfctl" , "-t" , "blocked_ips" , "-T" , "delete" , ip ]
212+ res = subprocess . run ( cmd , capture_output = True , text = True )
213+ return res . returncode == 0 or "not found" in res . stderr . lower ( )
214+
215+
179216 def _block_ip_windows (self , ip : str ) -> bool :
180- """Block IP using Windows Firewall (netsh)"""
181- rule_name = f"SimpleFirewall_Block_{ ip .replace ('.' , '_' )} "
217+ rule_name = f"SimpleFirewall_Block_{ ip .replace (':' , '_' ).replace ('.' , '_' )} "
182218 cmd = [
183- 'netsh' , 'advfirewall' , 'firewall' , 'add' , 'rule' ,
184- f'name={ rule_name } ' ,
185- 'dir=in' ,
186- 'action=block' ,
187- f'remoteip={ ip } '
219+ "netsh" , "advfirewall" , "firewall" , "add" , "rule" ,
220+ f"name={ rule_name } " ,
221+ "dir=in" , "action=block" , f"remoteip={ ip } "
188222 ]
189- try :
190- result = subprocess .run (cmd , capture_output = True , text = True )
191- if result .returncode == 0 :
192- self .logger .debug (f"netsh add rule stdout: { result .stdout .strip ()} " )
193- return True
194- else :
195- self .logger .error (f"netsh add rule failed: rc={ result .returncode } stdout={ result .stdout .strip ()} stderr={ result .stderr .strip ()} " )
196- return False
197- except Exception as e :
198- self .logger .error (f"Exception when running netsh add rule: { e } " )
199- return False
223+ res = subprocess .run (cmd , capture_output = True , text = True )
224+ return res .returncode == 0
200225
201226 def _unblock_ip_windows (self , ip : str ) -> bool :
202- """Unblock IP using Windows Firewall (netsh)"""
203- rule_name = f"SimpleFirewall_Block_{ ip .replace ('.' , '_' )} "
204- cmd = [
205- 'netsh' , 'advfirewall' , 'firewall' , 'delete' , 'rule' ,
206- f'name={ rule_name } '
207- ]
208- try :
209- result = subprocess .run (cmd , capture_output = True , text = True )
210- if result .returncode == 0 :
211- self .logger .debug (f"netsh delete rule stdout: { result .stdout .strip ()} " )
212- return True
213- else :
214- self .logger .error (f"netsh delete rule failed: rc={ result .returncode } stdout={ result .stdout .strip ()} stderr={ result .stderr .strip ()} " )
215- return False
216- except Exception as e :
217- self .logger .error (f"Exception when running netsh delete rule: { e } " )
218- return False
227+ rule_name = f"SimpleFirewall_Block_{ ip .replace (':' , '_' ).replace ('.' , '_' )} "
228+ cmd = ["netsh" , "advfirewall" , "firewall" , "delete" , "rule" ,
229+ f"name={ rule_name } " ]
230+ res = subprocess .run (cmd , capture_output = True , text = True )
231+ return res .returncode == 0
232+
219233
220234 def get_blocked_ips (self ) -> Dict [str , str ]:
221- """Get currently blocked IPs with their block times"""
222235 with self .lock :
223- return {
224- ip : block_time .isoformat ()
225- for ip , block_time in self .blocked_ips .items ()
226- }
227-
236+ return {ip : t .isoformat () for ip , t in self .blocked_ips .items ()}
237+
228238 def cleanup_all_blocks (self ) -> List [str ]:
229- """Remove all blocks (useful for shutdown)"""
230- cleaned_ips = []
231-
232- with self .lock :
233- for ip in list (self .blocked_ips .keys ()):
234- if self .unblock_ip (ip ):
235- cleaned_ips .append (ip )
236-
237- return cleaned_ips
238-
239+ cleaned = []
240+ for ip in list (self .blocked_ips .keys ()):
241+ if self .unblock_ip (ip ):
242+ cleaned .append (ip )
243+
244+ # macOS table flush
245+ if self .platform == "darwin" and cleaned :
246+ subprocess .run (["sudo" , "pfctl" , "-t" , "blocked_ips" , "-T" , "flush" ])
247+
248+ return cleaned
249+
239250 def get_stats (self ) -> Dict [str , int ]:
240- """Get blocking statistics"""
241251 with self .lock :
242252 return {
243- ' currently_blocked' : len (self .blocked_ips ),
244- ' whitelist_size' : len (self .whitelist )
245- }
253+ " currently_blocked" : len (self .blocked_ips ),
254+ " whitelist_size" : len (self .whitelist )
255+ }
0 commit comments