From 88b92b8dc8fd299538db865424082193e6d6b130 Mon Sep 17 00:00:00 2001 From: alantang <107459091+alantang1977@users.noreply.github.com> Date: Thu, 17 Apr 2025 13:50:43 +0800 Subject: [PATCH] =?UTF-8?q?Create=20=E7=81=AB=E8=BD=A6=E5=A4=AA=E9=A1=BAAP?= =?UTF-8?q?P.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/火车太顺APP.py | 301 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 301 insertions(+) create mode 100644 py/火车太顺APP.py diff --git a/py/火车太顺APP.py b/py/火车太顺APP.py new file mode 100644 index 00000000..6a801796 --- /dev/null +++ b/py/火车太顺APP.py @@ -0,0 +1,301 @@ +# -*- coding: utf-8 -*- +# by @嗷呜 +import sys +from urllib.parse import urlparse +sys.path.append("..") +import re +import hashlib +import hmac +import random +import string +from Crypto.Util.Padding import unpad +from concurrent.futures import ThreadPoolExecutor +from Crypto.PublicKey import RSA +from Crypto.Cipher import PKCS1_v1_5, AES +from base64 import b64encode, b64decode +import json +import time +from base.spider import Spider + + +class Spider(Spider): + + def init(self, extend=""): + self.device = self.device_id() + self.host = self.gethost() + pass + + def getName(self): + pass + + def isVideoFormat(self, url): + pass + + def manualVideoCheck(self): + pass + + def action(self, action): + pass + + def destroy(self): + pass + + def homeContent(self, filter): + result = {} + filters = {} + classes = [] + bba = self.url() + data = self.fetch(f"{self.host}/api/v1/app/config?pack={bba[0]}&signature={bba[1]}", headers=self.header()).text + data1 = self.aes(data) + dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序", + "sort": "排序"} + data1['data']['movie_screen']['sort'].pop(0) + for item in data1['data']['movie_screen']['sort']: + item['n'] = item.pop('name') + item['v'] = item.pop('value') + for item in data1['data']['movie_screen']['filter']: + has_non_empty_field = False + classes.append({"type_name": item["name"], "type_id": str(item["id"])}) + for key in dy: + if key in item and item[key]: + has_non_empty_field = True + break + if has_non_empty_field: + filters[str(item["id"])] = [] + filters[str(item["id"])].append( + {"key": 'sort', "name": '排序', "value": data1['data']['movie_screen']['sort']}) + for dkey in item: + if dkey in dy and item[dkey]: + item[dkey].pop(0) + value_array = [ + {"n": value.strip(), "v": value.strip()} + for value in item[dkey] + if value.strip() != "" + ] + filters[str(item["id"])].append( + {"key": dkey, "name": dy[dkey], "value": value_array} + ) + result["class"] = classes + result["filters"] = filters + return result + + def homeVideoContent(self): + bba = self.url() + url = f'{self.host}/api/v1/movie/index_recommend?pack={bba[0]}&signature={bba[1]}' + data = self.fetch(url, headers=self.header()).json() + videos = [] + for item in data['data']: + if len(item['list']) > 0: + for it in item['list']: + try: + videos.append(self.voides(it)) + except Exception as e: + continue + result = {"list": videos} + return result + + def categoryContent(self, tid, pg, filter, extend): + body = {"type_id": tid, "sort": extend.get("sort", "by_default"), "class": extend.get("class", "类型"), + "area": extend.get("area", "地区"), "year": extend.get("year", "年份"), "page": str(pg), + "pageSize": "21"} + result = {} + list = [] + bba = self.url(body) + url = f"{self.host}/api/v1/movie/screen/list?pack={bba[0]}&signature={bba[1]}" + data = self.fetch(url, headers=self.header()).json()['data']['list'] + for item in data: + list.append(self.voides(item)) + result["list"] = list + result["page"] = pg + result["pagecount"] = 9999 + result["limit"] = 90 + result["total"] = 999999 + return result + + def detailContent(self, ids): + body = {"id": ids[0]} + bba = self.url(body) + url = f'{self.host}/api/v1/movie/detail?pack={bba[0]}&signature={bba[1]}' + data = self.fetch(url, headers=self.header()).json()['data'] + video = {'vod_name': data.get('name'), 'type_name': data.get('type_name'), 'vod_year': data.get('year'), + 'vod_area': data.get('area'), 'vod_remarks': data.get('dynami'), 'vod_content': data.get('content')} + play = [] + names = [] + tasks = [] + for itt in data["play_from"]: + name = itt["name"] + a = [] + if len(itt["list"]) > 0: + names.append(name) + play.append(self.playeach(itt['list'])) + else: + tasks.append({"movie_id": ids[0], "from_code": itt["code"]}) + names.append(name) + if tasks: + with ThreadPoolExecutor(max_workers=len(tasks)) as executor: + results = executor.map(self.playlist, tasks) + for result in results: + if result: + play.append(result) + else: + play.append("") + video["vod_play_from"] = "$$$".join(names) + video["vod_play_url"] = "$$$".join(play) + result = {"list": [video]} + return result + + def searchContent(self, key, quick, pg=1): + body = {"keyword": key, "sort": "", "type_id": "0", "page": str(pg), "pageSize": "10", + "res_type": "by_movie_name"} + bba = self.url(body) + url = f"{self.host}/api/v1/movie/search?pack={bba[0]}&signature={bba[1]}" + data = self.fetch(url, headers=self.header()).json()['data'].get('list') + videos = [] + for it in data: + try: + videos.append(self.voides(it)) + except Exception as e: + continue + result = {"list": videos, "page": pg} + return result + + def playerContent(self, flag, id, vipFlags): + url = id + if not re.search(r"\.m3u8|\.mp4", url): + try: + data = json.loads(b64decode(id.encode('utf-8')).decode('utf-8')) + bba = self.url(data) + data2 = self.fetch(f"{self.host}/api/v1/movie_addr/parse_url?pack={bba[0]}&signature={bba[1]}", + headers=self.header()).json()['data'] + url = data2.get('play_url') or data2.get('download_url') + except Exception as e: + pass + if re.search(r'\.jpg|\.png|\.jpeg', url): + url = self.Mproxy(url) + result = {} + result["parse"] = 0 + result["url"] = url + result["header"] = {'user-agent': 'okhttp/4.9.2'} + return result + + def localProxy(self, param): + return self.Mlocal(param) + + def Mproxy(self, url): + return self.getProxyUrl() + "&url=" + b64encode(url.encode('utf-8')).decode('utf-8') + "&type=m3u8" + + def Mlocal(self, param,header=None): + url = self.d64(param["url"]) + ydata = self.fetch(url, headers=header, allow_redirects=False) + data = ydata.content.decode('utf-8') + if ydata.headers.get('Location'): + url = ydata.headers['Location'] + data = self.fetch(url, headers=header).content.decode('utf-8') + parsed_url = urlparse(url) + durl = parsed_url.scheme + "://" + parsed_url.netloc + lines = data.strip().split('\n') + for index, string in enumerate(lines): + if '#EXT' not in string and 'http' not in string: + last_slash_index = string.rfind('/') + lpath = string[:last_slash_index + 1] + lines[index] = durl + ('' if lpath.startswith('/') else '/') + lpath + data = '\n'.join(lines) + return [200, "application/vnd.apple.mpegur", data] + + def device_id(self): + characters = string.ascii_lowercase + string.digits + random_string = ''.join(random.choices(characters, k=32)) + return random_string + + def gethost(self): + try: + url = 'https://dns.alidns.com/dns-query' + headers = { + 'User-Agent': 'okhttp/4.9.2', + 'Accept': 'application/dns-message' + } + params = { + 'dns': 'AAABAAABAAAAAAAACWJmbTExYXM5ZgdmdXFpeXVuAmNuAAAcAAE' + } + response = self.fetch(url, headers=headers, params=params) + host=self.parse_dns_name(response.content, 12) + return f"https://{host}" + except: + return "https://bfm11as9f.fuqiyun.cn" + + def parse_dns_name(self, data, offset): + parts = [] + while True: + length = data[offset] + if length == 0: + break + offset += 1 + parts.append(data[offset:offset + length].decode('utf-8')) + offset += length + return '.'.join(parts) + + def header(self): + headers = { + 'User-Agent': 'Android', + 'Accept': 'application/prs.55App.v2+json', + 'timestamp': str(int(time.time())), + 'x-client-setting': '{"pure-mode":0}', + 'x-client-uuid': '{"device_id":' + self.device + '}, "type":1,"brand":"Redmi", "model":"M2012K10C", "system_version":30, "sdk_version":"3.1.0.7"}', + 'x-client-version': '3096 ' + } + return headers + + def url(self, id=None): + if not id: + id = {} + id["timestamp"] = str(int(time.time())) + public_key = 'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA02F/kPg5A2NX4qZ5JSns+bjhVMCC6JbTiTKpbgNgiXU+Kkorg6Dj76gS68gB8llhbUKCXjIdygnHPrxVHWfzmzisq9P9awmXBkCk74Skglx2LKHa/mNz9ivg6YzQ5pQFUEWS0DfomGBXVtqvBlOXMCRxp69oWaMsnfjnBV+0J7vHbXzUIkqBLdXSNfM9Ag5qdRDrJC3CqB65EJ3ARWVzZTTcXSdMW9i3qzEZPawPNPe5yPYbMZIoXLcrqvEZnRK1oak67/ihf7iwPJqdc+68ZYEmmdqwunOvRdjq89fQMVelmqcRD9RYe08v+xDxG9Co9z7hcXGTsUquMxkh29uNawIDAQAB' + encrypted_text = json.dumps(id) + public_key = RSA.import_key(b64decode(public_key)) + cipher = PKCS1_v1_5.new(public_key) + encrypted_message = cipher.encrypt(encrypted_text.encode('utf-8')) + encrypted_message_base64 = b64encode(encrypted_message).decode('utf-8') + result = encrypted_message_base64.replace('+', '-').replace('/', '_').replace('=', '') + key = '635a580fcb5dc6e60caa39c31a7bde48' + sign = hmac.new(key.encode(), result.encode(), hashlib.md5).hexdigest() + return result, sign + + def playlist(self, body): + try: + bba = self.url(body) + url = f'{self.host}/api/v1/movie_addr/list?pack={bba[0]}&signature={bba[1]}' + data = self.fetch(url, headers=self.header()).json()['data'] + return self.playeach(data) + except Exception: + return [] + + def playeach(self, data): + play_urls = [] + for it in data: + if re.search(r"mp4|m3u8", it["play_url"]): + play_urls.append(f"{it['episode_name']}${it['play_url']}") + else: + vd={"from_code": it['from_code'], "play_url": it['play_url'], "episode_id": it['episode_id'], "type": "play"} + play_urls.append( + f"{it['episode_name']}${b64encode(json.dumps(vd).encode('utf-8')).decode('utf-8')}" + ) + return '#'.join(play_urls) + + def voides(self, item): + if item['name'] or item['title']: + voide = { + "vod_id": item.get('id') or item.get('click'), + 'vod_name': item.get('name') or item.get('title'), + 'vod_pic': item.get('cover') or item.get('image'), + 'vod_year': item.get('year') or item.get('label'), + 'vod_remarks': item.get('dynamic') or item.get('sub_title') + } + return voide + + def aes(self, text): + text = text.replace('-', '+').replace('_', '/') + '==' + key = b"e6d5de5fcc51f53d" + iv = b"2f13eef7dfc6c613" + cipher = AES.new(key, AES.MODE_CBC, iv) + pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size).decode("utf-8") + return json.loads(pt)