以下是完整的 Python 验证客户端实现,仅依赖 requests 库。
pip install requests
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网络验证系统 Python SDK
"""
import hmac
import hashlib
import time
import random
import string
import uuid
import requests
import json
import threading
from typing import Optional, Dict, Any, Callable
class AuthClient:
"""验证客户端"""
def __init__(self, app_key: str, app_secret: str, transfer_key: str, api_url: str):
"""
初始化客户端
Args:
app_key: 应用密钥
app_secret: 应用签名密钥
transfer_key: 传输加密密钥
api_url: API地址
"""
self.app_key = app_key
self.app_secret = app_secret
self.transfer_key = transfer_key
self.api_url = api_url
# 登录状态
self.token = ""
self.exec_token = ""
self.username = ""
self.expire_time = ""
self.points = 0
self.heartbeat_interval = 30
# 错误信息
self.last_code = 0
self.last_msg = ""
# 心跳线程
self._heartbeat_thread: Optional[threading.Thread] = None
self._heartbeat_running = False
self._heartbeat_callback: Optional[Callable[[bool], None]] = None
self._machine_code = ""
@staticmethod
def _rc4(data: bytes, key: bytes) -> bytes:
"""RC4 加密/解密"""
S = list(range(256))
j = 0
for i in range(256):
j = (j + S[i] + key[i % len(key)]) % 256
S[i], S[j] = S[j], S[i]
result = bytearray(len(data))
i = j = 0
for k in range(len(data)):
i = (i + 1) % 256
j = (j + S[i]) % 256
S[i], S[j] = S[j], S[i]
result[k] = data[k] ^ S[(S[i] + S[j]) % 256]
return bytes(result)
@staticmethod
def _hmac_sha256(data: str, key: str) -> str:
"""HMAC-SHA256 签名"""
return hmac.new(
key.encode('utf-8'),
data.encode('utf-8'),
hashlib.sha256
).hexdigest()
@staticmethod
def _generate_nonce(length: int = 16) -> str:
"""生成随机字符串"""
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(length))
@staticmethod
def _get_timestamp() -> int:
"""获取Unix时间戳"""
return int(time.time())
def _build_request(self, params: Dict[str, str]) -> str:
"""构建加密请求"""
# 按key排序拼接参数
sorted_params = sorted(params.items())
data = '&'.join(f'{k}={v}' for k, v in sorted_params)
# 计算签名
sign = self._hmac_sha256(data, self.app_secret)
data += f'&sign={sign}'
# RC4加密并转HEX
encrypted = self._rc4(data.encode('utf-8'), self.transfer_key.encode('utf-8'))
return encrypted.hex()
def _decrypt_response(self, response: str) -> str:
"""解密响应"""
if not response or response.startswith('{'):
return response # 明文响应
# HEX解码并RC4解密
encrypted = bytes.fromhex(response)
decrypted = self._rc4(encrypted, self.transfer_key.encode('utf-8'))
return decrypted.decode('utf-8')
def _send_request(self, params: Dict[str, str]) -> Optional[Dict[str, Any]]:
"""发送请求"""
try:
enc_data = self._build_request(params)
url = f'{self.api_url}?app_key={self.app_key}&data={enc_data}'
response = requests.get(url, timeout=30)
decrypted = self._decrypt_response(response.text)
result = json.loads(decrypted)
self.last_code = result.get('code', -1)
self.last_msg = result.get('msg', '')
return result
except Exception as e:
self.last_code = -1
self.last_msg = str(e)
return None
def init(self, machine_code: str, version: str = "") -> bool:
"""
初始化
Args:
machine_code: 机器码
version: 客户端版本号
Returns:
是否成功
"""
params = {
'action': 'init',
'machine_code': machine_code,
'nonce': self._generate_nonce(),
'timestamp': str(self._get_timestamp())
}
if version:
params['version'] = version
result = self._send_request(params)
if not result or self.last_code != 1:
return False
self.heartbeat_interval = result.get('heartbeat_interval', 30)
if self.heartbeat_interval <= 0:
self.heartbeat_interval = 30
return True
def login(self, machine_code: str, username: str, password: str = "0") -> bool:
"""
登录(统一接口)
Args:
machine_code: 机器码
username: 用户名或卡密
password: 密码(卡密登录传"0")
Returns:
是否成功
"""
self._machine_code = machine_code
params = {
'action': 'login',
'machine_code': machine_code,
'nonce': self._generate_nonce(),
'password': password,
'timestamp': str(self._get_timestamp()),
'username': username
}
result = self._send_request(params)
if not result or self.last_code != 2:
return False
self.token = result.get('token', '')
self.exec_token = result.get('exec_token', '')
self.username = result.get('username', '')
self.expire_time = result.get('expire_time', '')
self.points = result.get('points', 0)
return True
def heartbeat(self, machine_code: str = "") -> bool:
"""
心跳
Args:
machine_code: 机器码(可选)
Returns:
是否成功
"""
params = {
'action': 'heartbeat',
'timestamp': str(self._get_timestamp()),
'token': self.token
}
if machine_code:
params['machine_code'] = machine_code
result = self._send_request(params)
if not result or self.last_code != 3:
return False
self.expire_time = result.get('expire_time', self.expire_time)
self.points = result.get('points', self.points)
self.exec_token = result.get('exec_token', self.exec_token)
return True
def logout(self, machine_code: str = "") -> bool:
"""
登出
Args:
machine_code: 机器码(可选)
Returns:
是否成功
"""
self.stop_heartbeat()
params = {
'action': 'logout',
'timestamp': str(self._get_timestamp()),
'token': self.token
}
if machine_code:
params['machine_code'] = machine_code
self._send_request(params)
return self.last_code == 4
def deduct(self, points: int, request_id: str = "", machine_code: str = "") -> bool:
"""
扣点
Args:
points: 扣除点数
request_id: 请求ID(用于幂等性)
machine_code: 机器码(可选)
Returns:
是否成功
"""
params = {
'action': 'deduct',
'points': str(points),
'timestamp': str(self._get_timestamp()),
'token': self.token
}
if request_id:
params['request_id'] = request_id
else:
params['request_id'] = str(uuid.uuid4())
if machine_code:
params['machine_code'] = machine_code
result = self._send_request(params)
if not result or self.last_code != 8:
return False
self.points = result.get('points', self.points)
return True
def recharge(self, card_no: str, username: str = "") -> bool:
"""
充值
Args:
card_no: 卡密
username: 用户名(未登录时使用)
Returns:
是否成功
"""
params = {
'action': 'recharge',
'card_no': card_no,
'timestamp': str(self._get_timestamp())
}
if self.token:
params['token'] = self.token
elif username:
params['username'] = username
result = self._send_request(params)
if not result or self.last_code != 5:
return False
self.expire_time = result.get('expire_time', self.expire_time)
self.points = result.get('points', self.points)
return True
def verify(self, machine_code: str = "") -> bool:
"""
验证Token
Args:
machine_code: 机器码(可选)
Returns:
是否有效
"""
params = {
'action': 'verify',
'timestamp': str(self._get_timestamp()),
'token': self.token
}
if machine_code:
params['machine_code'] = machine_code
result = self._send_request(params)
return result is not None and self.last_code == 9
def start_heartbeat(self, callback: Optional[Callable[[bool], None]] = None):
"""
启动心跳线程
Args:
callback: 心跳回调函数,参数为是否成功
"""
if self._heartbeat_running:
return
self._heartbeat_running = True
self._heartbeat_callback = callback
def heartbeat_loop():
while self._heartbeat_running:
time.sleep(self.heartbeat_interval)
if not self._heartbeat_running:
break
success = self.heartbeat(self._machine_code)
if self._heartbeat_callback:
self._heartbeat_callback(success)
if not success:
self._heartbeat_running = False
break
self._heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True)
self._heartbeat_thread.start()
def stop_heartbeat(self):
"""停止心跳线程"""
self._heartbeat_running = False
if self._heartbeat_thread and self._heartbeat_thread.is_alive():
self._heartbeat_thread.join(timeout=2)
def get_machine_code() -> str:
"""
获取机器码(示例实现)
实际项目中应该获取真实的硬件信息
"""
import platform
import hashlib
info = f"{platform.node()}-{platform.machine()}-{platform.processor()}"
return hashlib.md5(info.encode()).hexdigest().upper()
# 使用示例
if __name__ == '__main__':
# 配置
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
TRANSFER_KEY = "your_transfer_key"
API_URL = "https://your-server.com/api/v1/client/enc"
client = AuthClient(APP_KEY, APP_SECRET, TRANSFER_KEY, API_URL)
machine_code = get_machine_code()
# 1. 初始化
print("正在初始化...")
if not client.init(machine_code, "1.0.0"):
print(f"初始化失败: {client.last_msg}")
exit(1)
print(f"初始化成功,心跳间隔: {client.heartbeat_interval}秒")
# 2. 登录
card_no = input("请输入卡密: ").strip()
print("正在登录...")
if not client.login(machine_code, card_no, "0"):
print(f"登录失败: {client.last_msg} (code: {client.last_code})")
exit(1)
print("登录成功!")
print(f"用户名: {client.username}")
print(f"到期时间: {client.expire_time}")
print(f"剩余点数: {client.points}")
# 3. 启动心跳
def on_heartbeat(success: bool):
if not success:
print(f"\n心跳失败: {client.last_msg}")
client.start_heartbeat(on_heartbeat)
# 4. 主循环
print("\n输入 q 退出,输入 d 扣点...")
try:
while True:
cmd = input().strip().lower()
if cmd == 'q':
break
elif cmd == 'd':
if client.deduct(1):
print(f"扣点成功,剩余: {client.points}")
else:
print(f"扣点失败: {client.last_msg}")
except KeyboardInterrupt:
pass
# 5. 退出
client.logout(machine_code)
print("已退出登录")
import asyncio
import aiohttp
class AsyncAuthClient:
"""异步验证客户端"""
def __init__(self, app_key: str, app_secret: str, transfer_key: str, api_url: str):
self.app_key = app_key
self.app_secret = app_secret
self.transfer_key = transfer_key
self.api_url = api_url
self.token = ""
self.points = 0
# ... 其他属性同上
async def _send_request(self, params: dict) -> dict:
enc_data = self._build_request(params) # 同上
url = f'{self.api_url}?app_key={self.app_key}&data={enc_data}'
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=30) as response:
text = await response.text()
decrypted = self._decrypt_response(text)
return json.loads(decrypted)
async def login(self, machine_code: str, username: str, password: str = "0") -> bool:
params = {
'action': 'login',
'machine_code': machine_code,
'password': password,
'timestamp': str(int(time.time())),
'username': username
}
result = await self._send_request(params)
if result.get('code') == 2:
self.token = result.get('token', '')
self.points = result.get('points', 0)
return True
return False
# 使用
async def main():
client = AsyncAuthClient(APP_KEY, APP_SECRET, TRANSFER_KEY, API_URL)
await client.login(machine_code, card_no, "0")
asyncio.run(main())