diff --git a/py/py_小红书.py b/py/py_小红书.py new file mode 100644 index 00000000..2aca7983 --- /dev/null +++ b/py/py_小红书.py @@ -0,0 +1,176 @@ +# coding=utf-8 +# !/usr/bin/python +# by嗷呜 +import json +import random +import sys +import time +from base64 import b64decode +from Crypto.Cipher import AES +from Crypto.Hash import MD5 +from Crypto.Util.Padding import unpad +sys.path.append('..') +from base.spider import Spider + + +class Spider(Spider): + + def getName(self): + return "小红书" + + def init(self, extend=""): + self.did = self.random_str(32) + self.token,self.phost = self.gettoken() + pass + + def isVideoFormat(self, url): + pass + + def manualVideoCheck(self): + pass + + def destroy(self): + pass + + def random_str(self,length=16): + hex_chars = '0123456789abcdef' + return ''.join(random.choice(hex_chars) for _ in range(length)) + + def md5(self, text: str) -> str: + h = MD5.new() + h.update(text.encode('utf-8')) + return h.hexdigest() + + def homeContent(self, filter): + data = self.fetch(f'{self.host}/api/video/queryClassifyList?mark=4', headers=self.headers()).json()['encData'] + data1 = self.aes(data) + result = {} + classes = [] + for k in data1['data']: + classes.append({'type_name': k['classifyTitle'], 'type_id': k['classifyId']}) + result['class'] = classes + return result + + def homeVideoContent(self): + pass + + def categoryContent(self, tid, pg, filter, extend): + path=f'/api/short/video/getShortVideos?classifyId={tid}&videoMark=4&page={pg}&pageSize=20' + result = {} + videos = [] + data=self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData'] + vdata=self.aes(data) + for k in vdata['data']: + videos.append({"vod_id": k['videoId'], 'vod_name': k.get('title'), 'vod_pic': self.getProxyUrl() + '&url=' + k['coverImg'], + 'vod_remarks': self.dtim(k.get('playTime'))}) + result["list"] = videos + result["page"] = pg + result["pagecount"] = 9999 + result["limit"] = 90 + result["total"] = 999999 + return result + + def detailContent(self, ids): + path = f'/api/video/getVideoById?videoId={ids[0]}' + data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData'] + v = self.aes(data) + d=f'{v["title"]}$auth_key={v["authKey"]}&path={v["videoUrl"]}' + vod = {'vod_name': v["title"], 'type_name': ''.join(v.get('tagTitles',[])),'vod_play_from': v.get('nickName') or "小红书官方", 'vod_play_url': d} + result = {"list": [vod]} + return result + + def searchContent(self, key, quick, pg='1'): + pass + + def playerContent(self, flag, id, vipFlags): + h=self.headers() + h['Authorization'] = h.pop('aut') + del h['deviceid'] + result = {"parse": 0, "url": f"{self.host}/api/m3u8/decode/authPath?{id}", "header": h} + return result + + def localProxy(self, param): + return self.action(param) + + def aes(self, word): + key = b64decode("SmhiR2NpT2lKSVV6STFOaQ==") + iv = key + cipher = AES.new(key, AES.MODE_CBC, iv) + decrypted = unpad(cipher.decrypt(b64decode(word)), AES.block_size) + return json.loads(decrypted.decode('utf-8')) + + def dtim(self, seconds): + try: + seconds = int(seconds) + hours = seconds // 3600 + remaining_seconds = seconds % 3600 + minutes = remaining_seconds // 60 + remaining_seconds = remaining_seconds % 60 + + formatted_minutes = str(minutes).zfill(2) + formatted_seconds = str(remaining_seconds).zfill(2) + + if hours > 0: + formatted_hours = str(hours).zfill(2) + return f"{formatted_hours}:{formatted_minutes}:{formatted_seconds}" + else: + return f"{formatted_minutes}:{formatted_seconds}" + except: + return '' + + def getsign(self): + t=str(int(time.time() * 1000)) + return self.md5(t[3:8]) + + def gettoken(self): + url = f'{self.host}/api/user/traveler' + headers = { + 'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6', + 'deviceid': self.did, 't': str(int(time.time() * 1000)), 's': self.getsign(), } + data = {'deviceId': self.did, 'tt': 'U', 'code': '', 'chCode': 'dafe13'} + data1 = self.post(url, json=data, headers=headers).json() + data2 = data1['data'] + return data2['token'], data2['imgDomain'] + + host = 'https://jhfkdnov21vfd.fhoumpjjih.work' + + def headers(self): + henda = { + 'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6', + 'deviceid': self.did, 't': str(int(time.time() * 1000)), 's': self.getsign(), 'aut': self.token} + return henda + + def action(self, param): + headers = { + 'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 11; M2012K10C Build/RP1A.200720.011)'} + data = self.fetch(f'{self.phost}{param["url"]}', headers=headers) + type=data.headers.get('Content-Type').split(';')[0] + base64_data = self.img(data.content, 100, '2020-zq3-888') + return [200, type, base64_data] + + def img(self, data: bytes, length: int, key: str): + GIF = b'\x47\x49\x46' + JPG = b'\xFF\xD8\xFF' + PNG = b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A' + + def is_dont_need_decode_for_gif(data): + return len(data) > 2 and data[:3] == GIF + + def is_dont_need_decode_for_jpg(data): + return len(data) > 7 and data[:3] == JPG + + def is_dont_need_decode_for_png(data): + return len(data) > 7 and data[1:8] == PNG[1:8] + + if is_dont_need_decode_for_png(data): + return data + elif is_dont_need_decode_for_gif(data): + return data + elif is_dont_need_decode_for_jpg(data): + return data + else: + key_bytes = key.encode('utf-8') + result = bytearray(data) + for i in range(length): + result[i] ^= key_bytes[i % len(key_bytes)] + return bytes(result) diff --git a/py/py光速.py b/py/py光速.py deleted file mode 100644 index ebece1f0..00000000 --- a/py/py光速.py +++ /dev/null @@ -1,195 +0,0 @@ -# coding=utf-8 -# !/usr/bin/python -# by嗷呜 -import re -import sys -from urllib.parse import quote - -from Crypto.Hash import MD5 - -sys.path.append("..") -from Crypto.Cipher import AES -from Crypto.Util.Padding import pad, unpad -from base64 import b64encode, b64decode -import json -import time -from base.spider import Spider - - -class Spider(Spider): - - def getName(self): - return "光速" - - def init(self, extend=""): - self.host = self.gethost() - pass - - def isVideoFormat(self, url): - pass - - def manualVideoCheck(self): - pass - - def action(self, action): - pass - - def destroy(self): - pass - - def homeContent(self, filter): - data = self.getdata("/api.php/getappapi.index/initV119") - dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序", - "sort": "排序", } - filters = {} - classes = [] - json_data = data["type_list"] - homedata = data["banner_list"] - for item in json_data: - if item["type_name"] == "全部": - continue - has_non_empty_field = False - jsontype_extend = json.loads(item["type_extend"]) - homedata.extend(item["recommend_list"]) - jsontype_extend["sort"] = "最新,最热,最赞" - classes.append({"type_name": item["type_name"], "type_id": item["type_id"]}) - for key in dy: - if key in jsontype_extend and jsontype_extend[key].strip() != "": - has_non_empty_field = True - break - if has_non_empty_field: - filters[str(item["type_id"])] = [] - for dkey in jsontype_extend: - if dkey in dy and jsontype_extend[dkey].strip() != "": - values = jsontype_extend[dkey].split(",") - value_array = [{"n": value.strip(), "v": value.strip()} for value in values if - value.strip() != ""] - filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array}) - result = {} - result["class"] = classes - result["filters"] = filters - result["list"] = homedata - return result - - def homeVideoContent(self): - pass - - def categoryContent(self, tid, pg, filter, extend): - body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg, - "sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'), - "class": extend.get('class', '全部')} - result = {} - data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body) - result["list"] = data["recommend_list"] - result["page"] = pg - result["pagecount"] = 9999 - result["limit"] = 90 - result["total"] = 999999 - return result - - def detailContent(self, ids): - body = f"vod_id={ids[0]}" - data = self.getdata("/api.php/getappapi.index/vodDetail", body) - vod = data["vod"] - - play = [] - names = [] - for itt in data["vod_play_list"]: - a = [] - names.append(itt["player_info"]["show"]) - parse = itt["player_info"]["parse"] - ua = '' - if itt["player_info"].get("user_agent", ''): - ua = b64encode(itt["player_info"]["user_agent"].encode('utf-8')).decode('utf-8') - for it in itt["urls"]: - url = it["url"] - if not re.search(r'\.m3u8|\.mp4', url): - url = parse + '@@' + url - url = b64encode(url.encode('utf-8')).decode('utf-8') - a.append(f"{it['name']}${url}|||{ua}|||{it['token']}") - play.append("#".join(a)) - vod["vod_play_from"] = "$$$".join(names) - vod["vod_play_url"] = "$$$".join(play) - result = {"list": [vod]} - return result - - def searchContent(self, key, quick, pg="1"): - body = f"keywords={key}&type_id=0&page={pg}" - data = self.getdata("/api.php/getappapi.index/searchList", body) - result = {"list": data["search_list"], "page": pg} - return result - - phend = { - 'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 11; M2012K10C Build/RP1A.200720.011)'} - - def playerContent(self, flag, id, vipFlags): - ids = id.split("|||") - if ids[1]: self.phend['User-Agent'] = b64decode(ids[1]).decode('utf-8') - url = b64decode(ids[0]).decode('utf-8') - if not re.search(r'\.m3u8|\.mp4', url): - a = url.split("@@") - body = f"parse_api={a[0]}&url={quote(self.aes('encrypt', a[1]))}&token={ids[-1]}" - jd = self.getdata("/api.php/getappapi.index/vodParse", body)['json'] - url = json.loads(jd)['url'] - # if '.mp4' not in url: - # l=self.fetch(url, headers=self.phend,allow_redirects=False) - # if l.status_code == 200 and l.headers.get('Location',''): - # url=l.headers['Location'] - if '.jpg' in url or '.png' in url or '.jpeg' in url: - url = self.getProxyUrl() + "&url=" + b64encode(url.encode('utf-8')).decode('utf-8') + "&type=m3u8" - result = {} - result["parse"] = 0 - result["url"] = url - result["header"] = self.phend - return result - - def localProxy(self, param): - url = b64decode(param["url"]).decode('utf-8') - durl = url[:url.rfind('/')] - data = self.fetch(url, headers=self.phend).content.decode("utf-8") - inde = None - pd = True - lines = data.strip().split('\n') - for index, string in enumerate(lines): - # if '#EXT-X-DISCONTINUITY' in string and pd: - # pd = False - # inde = index - if '#EXT' not in string and 'http' not in string: - lines[index] = durl + ('' if string.startswith('/') else '/') + string - if inde: - del lines[inde:inde + 4] - data = '\n'.join(lines) - return [200, "application/vnd.apple.mpegur", data] - - def gethost(self): - host = self.fetch('https://jingyu-1312635929.cos.ap-nanjing.myqcloud.com/1.json').text.strip() - return host - - def aes(self, operation, text): - key = "4d83b87c4c5ea111".encode("utf-8") - iv = key - if operation == "encrypt": - cipher = AES.new(key, AES.MODE_CBC, iv) - ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size)) - ct = b64encode(ct_bytes).decode("utf-8") - return ct - elif operation == "decrypt": - cipher = AES.new(key, AES.MODE_CBC, iv) - pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size) - return pt.decode("utf-8") - - def header(self): - t = str(int(time.time())) - md5_hash = MD5.new() - md5_hash.update(t.encode('utf-8')) - signature_md5 = md5_hash.hexdigest() - header = {"User-Agent": "okhttp/3.14.9", "app-version-code": "300", "app-ui-mode": "light", - "app-user-device-id": signature_md5, "app-api-verify-time": t, - "app-api-verify-sign": self.aes("encrypt", t), "Content-Type": "application/x-www-form-urlencoded"} - return header - - def getdata(self, path, data=None): - # data = self.post(self.host + path, headers=self.header(), data=data).text - data = self.post(self.host + path, headers=self.header(), data=data, verify=False).json()["data"] - data1 = self.aes("decrypt", data) - return json.loads(data1)