Python SDK 完整示例

以下是完整的 Python 验证客户端实现,仅依赖 requests 库。

安装依赖

pip install requests

完整代码 (auth_client.py)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网络验证系统 Python SDK
"""

import hmac
import hashlib
import time
import random
import string
import uuid
import requests
import json
import threading
from typing import Optional, Dict, Any, Callable


class AuthClient:
    """验证客户端"""
    
    def __init__(self, app_key: str, app_secret: str, transfer_key: str, api_url: str):
        """
        初始化客户端
        
        Args:
            app_key: 应用密钥
            app_secret: 应用签名密钥
            transfer_key: 传输加密密钥
            api_url: API地址
        """
        self.app_key = app_key
        self.app_secret = app_secret
        self.transfer_key = transfer_key
        self.api_url = api_url
        
        # 登录状态
        self.token = ""
        self.exec_token = ""
        self.username = ""
        self.expire_time = ""
        self.points = 0
        self.heartbeat_interval = 30
        
        # 错误信息
        self.last_code = 0
        self.last_msg = ""
        
        # 心跳线程
        self._heartbeat_thread: Optional[threading.Thread] = None
        self._heartbeat_running = False
        self._heartbeat_callback: Optional[Callable[[bool], None]] = None
        self._machine_code = ""
    
    @staticmethod
    def _rc4(data: bytes, key: bytes) -> bytes:
        """RC4 加密/解密"""
        S = list(range(256))
        j = 0
        for i in range(256):
            j = (j + S[i] + key[i % len(key)]) % 256
            S[i], S[j] = S[j], S[i]
        
        result = bytearray(len(data))
        i = j = 0
        for k in range(len(data)):
            i = (i + 1) % 256
            j = (j + S[i]) % 256
            S[i], S[j] = S[j], S[i]
            result[k] = data[k] ^ S[(S[i] + S[j]) % 256]
        return bytes(result)
    
    @staticmethod
    def _hmac_sha256(data: str, key: str) -> str:
        """HMAC-SHA256 签名"""
        return hmac.new(
            key.encode('utf-8'),
            data.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
    
    @staticmethod
    def _generate_nonce(length: int = 16) -> str:
        """生成随机字符串"""
        chars = string.ascii_letters + string.digits
        return ''.join(random.choice(chars) for _ in range(length))
    
    @staticmethod
    def _get_timestamp() -> int:
        """获取Unix时间戳"""
        return int(time.time())
    
    def _build_request(self, params: Dict[str, str]) -> str:
        """构建加密请求"""
        # 按key排序拼接参数
        sorted_params = sorted(params.items())
        data = '&'.join(f'{k}={v}' for k, v in sorted_params)
        
        # 计算签名
        sign = self._hmac_sha256(data, self.app_secret)
        data += f'&sign={sign}'
        
        # RC4加密并转HEX
        encrypted = self._rc4(data.encode('utf-8'), self.transfer_key.encode('utf-8'))
        return encrypted.hex()
    
    def _decrypt_response(self, response: str) -> str:
        """解密响应"""
        if not response or response.startswith('{'):
            return response  # 明文响应
        
        # HEX解码并RC4解密
        encrypted = bytes.fromhex(response)
        decrypted = self._rc4(encrypted, self.transfer_key.encode('utf-8'))
        return decrypted.decode('utf-8')
    
    def _send_request(self, params: Dict[str, str]) -> Optional[Dict[str, Any]]:
        """发送请求"""
        try:
            enc_data = self._build_request(params)
            url = f'{self.api_url}?app_key={self.app_key}&data={enc_data}'
            
            response = requests.get(url, timeout=30)
            decrypted = self._decrypt_response(response.text)
            
            result = json.loads(decrypted)
            self.last_code = result.get('code', -1)
            self.last_msg = result.get('msg', '')
            
            return result
        except Exception as e:
            self.last_code = -1
            self.last_msg = str(e)
            return None
    
    def init(self, machine_code: str, version: str = "") -> bool:
        """
        初始化
        
        Args:
            machine_code: 机器码
            version: 客户端版本号
            
        Returns:
            是否成功
        """
        params = {
            'action': 'init',
            'machine_code': machine_code,
            'nonce': self._generate_nonce(),
            'timestamp': str(self._get_timestamp())
        }
        if version:
            params['version'] = version
        
        result = self._send_request(params)
        if not result or self.last_code != 1:
            return False
        
        self.heartbeat_interval = result.get('heartbeat_interval', 30)
        if self.heartbeat_interval <= 0:
            self.heartbeat_interval = 30
        
        return True
    
    def login(self, machine_code: str, username: str, password: str = "0") -> bool:
        """
        登录(统一接口)
        
        Args:
            machine_code: 机器码
            username: 用户名或卡密
            password: 密码(卡密登录传"0")
            
        Returns:
            是否成功
        """
        self._machine_code = machine_code
        
        params = {
            'action': 'login',
            'machine_code': machine_code,
            'nonce': self._generate_nonce(),
            'password': password,
            'timestamp': str(self._get_timestamp()),
            'username': username
        }
        
        result = self._send_request(params)
        if not result or self.last_code != 2:
            return False
        
        self.token = result.get('token', '')
        self.exec_token = result.get('exec_token', '')
        self.username = result.get('username', '')
        self.expire_time = result.get('expire_time', '')
        self.points = result.get('points', 0)
        
        return True
    
    def heartbeat(self, machine_code: str = "") -> bool:
        """
        心跳
        
        Args:
            machine_code: 机器码(可选)
            
        Returns:
            是否成功
        """
        params = {
            'action': 'heartbeat',
            'timestamp': str(self._get_timestamp()),
            'token': self.token
        }
        if machine_code:
            params['machine_code'] = machine_code
        
        result = self._send_request(params)
        if not result or self.last_code != 3:
            return False
        
        self.expire_time = result.get('expire_time', self.expire_time)
        self.points = result.get('points', self.points)
        self.exec_token = result.get('exec_token', self.exec_token)
        
        return True
    
    def logout(self, machine_code: str = "") -> bool:
        """
        登出
        
        Args:
            machine_code: 机器码(可选)
            
        Returns:
            是否成功
        """
        self.stop_heartbeat()
        
        params = {
            'action': 'logout',
            'timestamp': str(self._get_timestamp()),
            'token': self.token
        }
        if machine_code:
            params['machine_code'] = machine_code
        
        self._send_request(params)
        return self.last_code == 4
    
    def deduct(self, points: int, request_id: str = "", machine_code: str = "") -> bool:
        """
        扣点
        
        Args:
            points: 扣除点数
            request_id: 请求ID(用于幂等性)
            machine_code: 机器码(可选)
            
        Returns:
            是否成功
        """
        params = {
            'action': 'deduct',
            'points': str(points),
            'timestamp': str(self._get_timestamp()),
            'token': self.token
        }
        if request_id:
            params['request_id'] = request_id
        else:
            params['request_id'] = str(uuid.uuid4())
        if machine_code:
            params['machine_code'] = machine_code
        
        result = self._send_request(params)
        if not result or self.last_code != 8:
            return False
        
        self.points = result.get('points', self.points)
        return True
    
    def recharge(self, card_no: str, username: str = "") -> bool:
        """
        充值
        
        Args:
            card_no: 卡密
            username: 用户名(未登录时使用)
            
        Returns:
            是否成功
        """
        params = {
            'action': 'recharge',
            'card_no': card_no,
            'timestamp': str(self._get_timestamp())
        }
        if self.token:
            params['token'] = self.token
        elif username:
            params['username'] = username
        
        result = self._send_request(params)
        if not result or self.last_code != 5:
            return False
        
        self.expire_time = result.get('expire_time', self.expire_time)
        self.points = result.get('points', self.points)
        return True
    
    def verify(self, machine_code: str = "") -> bool:
        """
        验证Token
        
        Args:
            machine_code: 机器码(可选)
            
        Returns:
            是否有效
        """
        params = {
            'action': 'verify',
            'timestamp': str(self._get_timestamp()),
            'token': self.token
        }
        if machine_code:
            params['machine_code'] = machine_code
        
        result = self._send_request(params)
        return result is not None and self.last_code == 9
    
    def start_heartbeat(self, callback: Optional[Callable[[bool], None]] = None):
        """
        启动心跳线程
        
        Args:
            callback: 心跳回调函数,参数为是否成功
        """
        if self._heartbeat_running:
            return
        
        self._heartbeat_running = True
        self._heartbeat_callback = callback
        
        def heartbeat_loop():
            while self._heartbeat_running:
                time.sleep(self.heartbeat_interval)
                if not self._heartbeat_running:
                    break
                
                success = self.heartbeat(self._machine_code)
                if self._heartbeat_callback:
                    self._heartbeat_callback(success)
                
                if not success:
                    self._heartbeat_running = False
                    break
        
        self._heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True)
        self._heartbeat_thread.start()
    
    def stop_heartbeat(self):
        """停止心跳线程"""
        self._heartbeat_running = False
        if self._heartbeat_thread and self._heartbeat_thread.is_alive():
            self._heartbeat_thread.join(timeout=2)


def get_machine_code() -> str:
    """
    获取机器码(示例实现)
    实际项目中应该获取真实的硬件信息
    """
    import platform
    import hashlib
    
    info = f"{platform.node()}-{platform.machine()}-{platform.processor()}"
    return hashlib.md5(info.encode()).hexdigest().upper()


# 使用示例
if __name__ == '__main__':
    # 配置
    APP_KEY = "your_app_key"
    APP_SECRET = "your_app_secret"
    TRANSFER_KEY = "your_transfer_key"
    API_URL = "https://your-server.com/api/v1/client/enc"
    
    client = AuthClient(APP_KEY, APP_SECRET, TRANSFER_KEY, API_URL)
    machine_code = get_machine_code()
    
    # 1. 初始化
    print("正在初始化...")
    if not client.init(machine_code, "1.0.0"):
        print(f"初始化失败: {client.last_msg}")
        exit(1)
    print(f"初始化成功,心跳间隔: {client.heartbeat_interval}秒")
    
    # 2. 登录
    card_no = input("请输入卡密: ").strip()
    
    print("正在登录...")
    if not client.login(machine_code, card_no, "0"):
        print(f"登录失败: {client.last_msg} (code: {client.last_code})")
        exit(1)
    
    print("登录成功!")
    print(f"用户名: {client.username}")
    print(f"到期时间: {client.expire_time}")
    print(f"剩余点数: {client.points}")
    
    # 3. 启动心跳
    def on_heartbeat(success: bool):
        if not success:
            print(f"\n心跳失败: {client.last_msg}")
    
    client.start_heartbeat(on_heartbeat)
    
    # 4. 主循环
    print("\n输入 q 退出,输入 d 扣点...")
    try:
        while True:
            cmd = input().strip().lower()
            if cmd == 'q':
                break
            elif cmd == 'd':
                if client.deduct(1):
                    print(f"扣点成功,剩余: {client.points}")
                else:
                    print(f"扣点失败: {client.last_msg}")
    except KeyboardInterrupt:
        pass
    
    # 5. 退出
    client.logout(machine_code)
    print("已退出登录")

高级用法:异步版本

import asyncio
import aiohttp

class AsyncAuthClient:
    """异步验证客户端"""
    
    def __init__(self, app_key: str, app_secret: str, transfer_key: str, api_url: str):
        self.app_key = app_key
        self.app_secret = app_secret
        self.transfer_key = transfer_key
        self.api_url = api_url
        self.token = ""
        self.points = 0
        # ... 其他属性同上
    
    async def _send_request(self, params: dict) -> dict:
        enc_data = self._build_request(params)  # 同上
        url = f'{self.api_url}?app_key={self.app_key}&data={enc_data}'
        
        async with aiohttp.ClientSession() as session:
            async with session.get(url, timeout=30) as response:
                text = await response.text()
                decrypted = self._decrypt_response(text)
                return json.loads(decrypted)
    
    async def login(self, machine_code: str, username: str, password: str = "0") -> bool:
        params = {
            'action': 'login',
            'machine_code': machine_code,
            'password': password,
            'timestamp': str(int(time.time())),
            'username': username
        }
        result = await self._send_request(params)
        if result.get('code') == 2:
            self.token = result.get('token', '')
            self.points = result.get('points', 0)
            return True
        return False

# 使用
async def main():
    client = AsyncAuthClient(APP_KEY, APP_SECRET, TRANSFER_KEY, API_URL)
    await client.login(machine_code, card_no, "0")

asyncio.run(main())