From 4e6be584329bf4a57f4c972e3cbd8fd675093bf3 Mon Sep 17 00:00:00 2001 From: alantang <107459091+alantang1977@users.noreply.github.com> Date: Wed, 19 Feb 2025 13:12:55 +0800 Subject: [PATCH] =?UTF-8?q?Delete=20py/=E5=B0=8F=E7=BA=A2=E4=B9=A6.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- py/小红书.py | 176 ------------------------------------------------ 1 file changed, 176 deletions(-) delete mode 100644 py/小红书.py diff --git a/py/小红书.py b/py/小红书.py deleted file mode 100644 index 125af7c0..00000000 --- a/py/小红书.py +++ /dev/null @@ -1,176 +0,0 @@ -# coding=utf-8 -# !/usr/bin/python -# by嗷呜 -import json -import random -import sys -import time -from base64 import b64decode -from Crypto.Cipher import AES -from Crypto.Hash import MD5 -from Crypto.Util.Padding import unpad -sys.path.append('..') -from base.spider import Spider - - -class Spider(Spider): - - def getName(self): - return "小红书" - - def init(self, extend=""): - self.did = self.random_str(32) - self.token,self.phost = self.gettoken() - pass - - def isVideoFormat(self, url): - pass - - def manualVideoCheck(self): - pass - - def destroy(self): - pass - - def random_str(self,length=16): - hex_chars = '0123456789abcdef' - return ''.join(random.choice(hex_chars) for _ in range(length)) - - def md5(self, text: str) -> str: - h = MD5.new() - h.update(text.encode('utf-8')) - return h.hexdigest() - - def homeContent(self, filter): - data = self.fetch(f'{self.host}/api/video/queryClassifyList?mark=4', headers=self.headers()).json()['encData'] - data1 = self.aes(data) - result = {} - classes = [] - for k in data1['data']: - classes.append({'type_name': k['classifyTitle'], 'type_id': k['classifyId']}) - result['class'] = classes - return result - - def homeVideoContent(self): - pass - - def categoryContent(self, tid, pg, filter, extend): - path=f'/api/short/video/getShortVideos?classifyId={tid}&videoMark=4&page={pg}&pageSize=20' - result = {} - videos = [] - data=self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData'] - vdata=self.aes(data) - for k in vdata['data']: - videos.append({"vod_id": k['videoId'], 'vod_name': k.get('title'), 'vod_pic': self.getProxyUrl() + '&url=' + k['coverImg'], - 'vod_remarks': self.dtim(k.get('playTime'))}) - result["list"] = videos - result["page"] = pg - result["pagecount"] = 9999 - result["limit"] = 90 - result["total"] = 999999 - return result - - def detailContent(self, ids): - path = f'/api/video/getVideoById?videoId={ids[0]}' - data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData'] - v = self.aes(data) - d=f'{v["title"]}$auth_key={v["authKey"]}&path={v["videoUrl"]}' - vod = {'vod_name': v["title"], 'type_name': ''.join(v.get('tagTitles',[])),'vod_play_from': v.get('nickName') or "小红书官方", 'vod_play_url': d} - result = {"list": [vod]} - return result - - def searchContent(self, key, quick, pg='1'): - pass - - def playerContent(self, flag, id, vipFlags): - h=self.headers() - h['Authorization'] = h.pop('aut') - del h['deviceid'] - result = {"parse": 0, "url": f"{self.host}/api/m3u8/decode/authPath?{id}", "header": h} - return result - - def localProxy(self, param): - return self.action(param) - - def aes(self, word): - key = b64decode("SmhiR2NpT2lKSVV6STFOaQ==") - iv = key - cipher = AES.new(key, AES.MODE_CBC, iv) - decrypted = unpad(cipher.decrypt(b64decode(word)), AES.block_size) - return json.loads(decrypted.decode('utf-8')) - - def dtim(self, seconds): - try: - seconds = int(seconds) - hours = seconds // 3600 - remaining_seconds = seconds % 3600 - minutes = remaining_seconds // 60 - remaining_seconds = remaining_seconds % 60 - - formatted_minutes = str(minutes).zfill(2) - formatted_seconds = str(remaining_seconds).zfill(2) - - if hours > 0: - formatted_hours = str(hours).zfill(2) - return f"{formatted_hours}:{formatted_minutes}:{formatted_seconds}" - else: - return f"{formatted_minutes}:{formatted_seconds}" - except: - return '' - - def getsign(self): - t=str(int(time.time() * 1000)) - return self.md5(t[3:8]) - - def gettoken(self): - url = f'{self.host}/api/user/traveler' - headers = { - 'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6', - 'deviceid': self.did, 't': str(int(time.time() * 1000)), 's': self.getsign(), } - data = {'deviceId': self.did, 'tt': 'U', 'code': '', 'chCode': 'dafe13'} - data1 = self.post(url, json=data, headers=headers).json() - data2 = data1['data'] - return data2['token'], data2['imgDomain'] - - host = 'https://jhfkdnov21vfd.fhoumpjjih.work' - - def headers(self): - henda = { - 'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6', - 'deviceid': self.did, 't': str(int(time.time() * 1000)), 's': self.getsign(), 'aut': self.token} - return henda - - def action(self, param): - headers = { - 'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 11; M2012K10C Build/RP1A.200720.011)'} - data = self.fetch(f'{self.phost}{param["url"]}', headers=headers) - type=data.headers.get('Content-Type').split(';')[0] - base64_data = self.img(data.content, 100, '2020-zq3-888') - return [200, type, base64_data] - - def img(self, data: bytes, length: int, key: str): - GIF = b'\x47\x49\x46' - JPG = b'\xFF\xD8\xFF' - PNG = b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A' - - def is_dont_need_decode_for_gif(data): - return len(data) > 2 and data[:3] == GIF - - def is_dont_need_decode_for_jpg(data): - return len(data) > 7 and data[:3] == JPG - - def is_dont_need_decode_for_png(data): - return len(data) > 7 and data[1:8] == PNG[1:8] - - if is_dont_need_decode_for_png(data): - return data - elif is_dont_need_decode_for_gif(data): - return data - elif is_dont_need_decode_for_jpg(data): - return data - else: - key_bytes = key.encode('utf-8') - result = bytearray(data) - for i in range(length): - result[i] ^= key_bytes[i % len(key_bytes)] - return bytes(result)