# -*- coding: utf-8 -*- # by @嗷呜 import json import sys import time import uuid from base64 import b64decode, b64encode from concurrent.futures import ThreadPoolExecutor, as_completed from Crypto.Cipher import AES from Crypto.Hash import SHA256, MD5 from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 from Crypto.Util.Padding import unpad sys.path.append('..') from base.spider import Spider class Spider(Spider): def init(self, extend=""): self.host, self.appKey, self.rsakey = self.userinfo() pass def getName(self): pass def isVideoFormat(self, url): pass def manualVideoCheck(self): pass def destroy(self): pass def homeContent(self, filter): data = self.fetch(f"{self.host}/api.php/zjv6.vod/types", headers=self.getheader()).json() dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序", } filters = {} classes = [] json_data = data['data']['list'] for item in json_data: has_non_empty_field = False jsontype_extend = item["type_extend"] jsontype_extend['by'] = '按更新,按播放,按评分,按收藏' classes.append({"type_name": item["type_name"], "type_id": item["type_id"]}) for key in dy: if key in jsontype_extend and jsontype_extend[key].strip() != "": has_non_empty_field = True break if has_non_empty_field: filters[str(item["type_id"])] = [] for dkey in jsontype_extend: if dkey in dy and jsontype_extend[dkey].strip() != "": values = jsontype_extend[dkey].split(",") sl = {'按更新': 'time', '按播放': 'hits', '按评分': 'score', '按收藏': 'store_num'} value_array = [ {"n": value.strip(), "v": sl[value.strip()] if dkey == "by" else value.strip()} for value in values if value.strip() != "" ] filters[str(item["type_id"])].append( {"key": dkey, "name": dy[dkey], "value": value_array} ) result = {"class": classes, "filters": filters} return result def homeVideoContent(self): data = self.fetch(f"{self.host}/api.php/zjv6.vod/vodPhbAll", headers=self.getheader()).json() return {'list': data['data']['list'][0]['vod_list']} def categoryContent(self, tid, pg, filter, extend): params = { "type": tid, "class": extend.get('class', ''), "lang": extend.get('lang', ''), "area": extend.get('area', ''), "year": extend.get('year', ''), "by": extend.get('by', ''), "page": pg, "limit": "12" } data = self.fetch(f"{self.host}/api.php/zjv6.vod", headers=self.getheader(), params=params).json() result = {} result['list'] = data['data']['list'] result['page'] = pg result['pagecount'] = 9999 result['limit'] = 90 result['total'] = 999999 return result def detailContent(self, ids): data = self.fetch(f"{self.host}/api.php/zjv6.vod/detail?vod_id={ids[0]}&rel_limit=10", headers=self.getheader()).json() vod = data['data'] v, np = {'vod_play_from': [], 'vod_play_url': []}, {} for i in vod['vod_play_list']: n = i['player_info']['show'] np[n] = [] for j in i['urls']: j['parse'] = i['player_info']['parse2'] nm = j.pop('name') np[n].append(f"{nm}${self.e64(json.dumps(j))}") for key, value in np.items(): v['vod_play_from'].append(key) v['vod_play_url'].append('#'.join(value)) v['vod_play_from'] = '$$$'.join(v['vod_play_from']) v['vod_play_url'] = '$$$'.join(v['vod_play_url']) vod.update(v) vod.pop('vod_play_list', None) vod.pop('type', None) return {'list': [vod]} def searchContent(self, key, quick, pg="1"): data = self.fetch(f"{self.host}/api.php/zjv6.vod?page={pg}&limit=20&wd={key}", headers=self.getheader()).json() return {'list': data['data']['list'], 'page': pg} def playerContent(self, flag, id, vipFlags): ids = json.loads(self.d64(id)) target_url = ids['url'] try: parse_str = ids.get('parse', '') if parse_str: parse_urls = parse_str.split(',') result_url = self.try_all_parses(parse_urls, target_url) if result_url: return { 'parse': 0, 'url': result_url, 'header': {'User-Agent': 'dart:io'} } return { 'parse': 1, 'url': target_url, 'header': {'User-Agent': 'dart:io'} } except Exception as e: print(e) return { 'parse': 1, 'url': target_url, 'header': {'User-Agent': 'dart:io'} } def liveContent(self, url): pass def localProxy(self, param): pass def userinfo(self): t = str(int(time.time() * 1000)) uid = self.generate_uid() sign = self.md5(f"appKey=3bbf7348cf314874883a18d6b6fcf67a&uid={uid}&time={t}") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36', 'Connection': 'Keep-Alive', 'appKey': '3bbf7348cf314874883a18d6b6fcf67a', 'uid': uid, 'time': t, 'sign': sign, } params = { 'access_token': '74d5879931b9774be10dee3d8c51008e', } response = self.fetch('https://gitee.com/api/v5/repos/aycapp/openapi/contents/wawaconf.txt', params=params, headers=headers).json() data = json.loads(self.decrypt(response['content'])) return data['baseUrl'], data['appKey'], data['appSecret'] def e64(self, text): try: text_bytes = text.encode('utf-8') encoded_bytes = b64encode(text_bytes) return encoded_bytes.decode('utf-8') except Exception as e: print(f"Base64编码错误: {str(e)}") return "" def d64(self, encoded_text): try: encoded_bytes = encoded_text.encode('utf-8') decoded_bytes = b64decode(encoded_bytes) return decoded_bytes.decode('utf-8') except Exception as e: print(f"Base64解码错误: {str(e)}") return "" def md5(self, text): h = MD5.new() h.update(text.encode('utf-8')) return h.hexdigest() def generate_uid(self): return uuid.uuid4().hex def getheader(self): t = str(int(time.time() * 1000)) uid = self.generate_uid() sign = self.sign_message(f"appKey={self.appKey}&time={t}&uid={uid}") headers = { 'User-Agent': 'okhttp/4.9.3', 'Connection': 'Keep-Alive', 'uid': uid, 'time': t, 'appKey': self.appKey, 'sign': sign, } return headers def decrypt(self, encrypted_data): key = b64decode('Crm4FXWkk5JItpYirFDpqg==') cipher = AES.new(key, AES.MODE_ECB) encrypted = bytes.fromhex(self.d64(encrypted_data)) decrypted = cipher.decrypt(encrypted) unpadded = unpad(decrypted, AES.block_size) return unpadded.decode('utf-8') def sign_message(self, message): private_key_str = f"-----BEGIN PRIVATE KEY-----\n{self.rsakey}\n-----END PRIVATE KEY-----" private_key = RSA.import_key(private_key_str) message_hash = SHA256.new(message.encode('utf-8')) signature = pkcs1_15.new(private_key).sign(message_hash) signature_b64 = b64encode(signature).decode('utf-8') return signature_b64 def fetch_url(self, parse_url, target_url): try: response = self.fetch(f"{parse_url.replace('..', '.')}{target_url}", headers={"user-agent": "okhttp/4.1.0/luob.app"}, timeout=5) if response.status_code == 200: try: data = response.json() result_url = data.get('url') or data.get('data', {}).get('url') if result_url: return result_url except: pass return None except: return None def try_all_parses(self, parse_urls, target_url): with ThreadPoolExecutor(max_workers=(len(parse_urls))) as executor: future_to_url = { executor.submit(self.fetch_url, parse_url.strip(), target_url): parse_url for parse_url in parse_urls if parse_url.strip() } for future in as_completed(future_to_url): try: result = future.result() if result: return result except: continue return None