Update and rename py光速.py to py_小红书.py

This commit is contained in:
alantang 2025-02-19 14:00:20 +08:00 committed by GitHub
parent 805990af3f
commit a5ca4acbd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 176 additions and 195 deletions

176
py/py_小红书.py Normal file
View File

@ -0,0 +1,176 @@
# coding=utf-8
# !/usr/bin/python
# by嗷呜
import json
import random
import sys
import time
from base64 import b64decode
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Util.Padding import unpad
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "小红书"
def init(self, extend=""):
self.did = self.random_str(32)
self.token,self.phost = self.gettoken()
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def random_str(self,length=16):
hex_chars = '0123456789abcdef'
return ''.join(random.choice(hex_chars) for _ in range(length))
def md5(self, text: str) -> str:
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()
def homeContent(self, filter):
data = self.fetch(f'{self.host}/api/video/queryClassifyList?mark=4', headers=self.headers()).json()['encData']
data1 = self.aes(data)
result = {}
classes = []
for k in data1['data']:
classes.append({'type_name': k['classifyTitle'], 'type_id': k['classifyId']})
result['class'] = classes
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
path=f'/api/short/video/getShortVideos?classifyId={tid}&videoMark=4&page={pg}&pageSize=20'
result = {}
videos = []
data=self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
vdata=self.aes(data)
for k in vdata['data']:
videos.append({"vod_id": k['videoId'], 'vod_name': k.get('title'), 'vod_pic': self.getProxyUrl() + '&url=' + k['coverImg'],
'vod_remarks': self.dtim(k.get('playTime'))})
result["list"] = videos
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def detailContent(self, ids):
path = f'/api/video/getVideoById?videoId={ids[0]}'
data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
v = self.aes(data)
d=f'{v["title"]}$auth_key={v["authKey"]}&path={v["videoUrl"]}'
vod = {'vod_name': v["title"], 'type_name': ''.join(v.get('tagTitles',[])),'vod_play_from': v.get('nickName') or "小红书官方", 'vod_play_url': d}
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg='1'):
pass
def playerContent(self, flag, id, vipFlags):
h=self.headers()
h['Authorization'] = h.pop('aut')
del h['deviceid']
result = {"parse": 0, "url": f"{self.host}/api/m3u8/decode/authPath?{id}", "header": h}
return result
def localProxy(self, param):
return self.action(param)
def aes(self, word):
key = b64decode("SmhiR2NpT2lKSVV6STFOaQ==")
iv = key
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(b64decode(word)), AES.block_size)
return json.loads(decrypted.decode('utf-8'))
def dtim(self, seconds):
try:
seconds = int(seconds)
hours = seconds // 3600
remaining_seconds = seconds % 3600
minutes = remaining_seconds // 60
remaining_seconds = remaining_seconds % 60
formatted_minutes = str(minutes).zfill(2)
formatted_seconds = str(remaining_seconds).zfill(2)
if hours > 0:
formatted_hours = str(hours).zfill(2)
return f"{formatted_hours}:{formatted_minutes}:{formatted_seconds}"
else:
return f"{formatted_minutes}:{formatted_seconds}"
except:
return ''
def getsign(self):
t=str(int(time.time() * 1000))
return self.md5(t[3:8])
def gettoken(self):
url = f'{self.host}/api/user/traveler'
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6',
'deviceid': self.did, 't': str(int(time.time() * 1000)), 's': self.getsign(), }
data = {'deviceId': self.did, 'tt': 'U', 'code': '', 'chCode': 'dafe13'}
data1 = self.post(url, json=data, headers=headers).json()
data2 = data1['data']
return data2['token'], data2['imgDomain']
host = 'https://jhfkdnov21vfd.fhoumpjjih.work'
def headers(self):
henda = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6',
'deviceid': self.did, 't': str(int(time.time() * 1000)), 's': self.getsign(), 'aut': self.token}
return henda
def action(self, param):
headers = {
'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 11; M2012K10C Build/RP1A.200720.011)'}
data = self.fetch(f'{self.phost}{param["url"]}', headers=headers)
type=data.headers.get('Content-Type').split(';')[0]
base64_data = self.img(data.content, 100, '2020-zq3-888')
return [200, type, base64_data]
def img(self, data: bytes, length: int, key: str):
GIF = b'\x47\x49\x46'
JPG = b'\xFF\xD8\xFF'
PNG = b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'
def is_dont_need_decode_for_gif(data):
return len(data) > 2 and data[:3] == GIF
def is_dont_need_decode_for_jpg(data):
return len(data) > 7 and data[:3] == JPG
def is_dont_need_decode_for_png(data):
return len(data) > 7 and data[1:8] == PNG[1:8]
if is_dont_need_decode_for_png(data):
return data
elif is_dont_need_decode_for_gif(data):
return data
elif is_dont_need_decode_for_jpg(data):
return data
else:
key_bytes = key.encode('utf-8')
result = bytearray(data)
for i in range(length):
result[i] ^= key_bytes[i % len(key_bytes)]
return bytes(result)

View File

@ -1,195 +0,0 @@
# coding=utf-8
# !/usr/bin/python
# by嗷呜
import re
import sys
from urllib.parse import quote
from Crypto.Hash import MD5
sys.path.append("..")
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from base64 import b64encode, b64decode
import json
import time
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "光速"
def init(self, extend=""):
self.host = self.gethost()
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
def homeContent(self, filter):
data = self.getdata("/api.php/getappapi.index/initV119")
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
"sort": "排序", }
filters = {}
classes = []
json_data = data["type_list"]
homedata = data["banner_list"]
for item in json_data:
if item["type_name"] == "全部":
continue
has_non_empty_field = False
jsontype_extend = json.loads(item["type_extend"])
homedata.extend(item["recommend_list"])
jsontype_extend["sort"] = "最新,最热,最赞"
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
for key in dy:
if key in jsontype_extend and jsontype_extend[key].strip() != "":
has_non_empty_field = True
break
if has_non_empty_field:
filters[str(item["type_id"])] = []
for dkey in jsontype_extend:
if dkey in dy and jsontype_extend[dkey].strip() != "":
values = jsontype_extend[dkey].split(",")
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
value.strip() != ""]
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
result = {}
result["class"] = classes
result["filters"] = filters
result["list"] = homedata
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
"class": extend.get('class', '全部')}
result = {}
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
result["list"] = data["recommend_list"]
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def detailContent(self, ids):
body = f"vod_id={ids[0]}"
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
vod = data["vod"]
play = []
names = []
for itt in data["vod_play_list"]:
a = []
names.append(itt["player_info"]["show"])
parse = itt["player_info"]["parse"]
ua = ''
if itt["player_info"].get("user_agent", ''):
ua = b64encode(itt["player_info"]["user_agent"].encode('utf-8')).decode('utf-8')
for it in itt["urls"]:
url = it["url"]
if not re.search(r'\.m3u8|\.mp4', url):
url = parse + '@@' + url
url = b64encode(url.encode('utf-8')).decode('utf-8')
a.append(f"{it['name']}${url}|||{ua}|||{it['token']}")
play.append("#".join(a))
vod["vod_play_from"] = "$$$".join(names)
vod["vod_play_url"] = "$$$".join(play)
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg="1"):
body = f"keywords={key}&type_id=0&page={pg}"
data = self.getdata("/api.php/getappapi.index/searchList", body)
result = {"list": data["search_list"], "page": pg}
return result
phend = {
'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 11; M2012K10C Build/RP1A.200720.011)'}
def playerContent(self, flag, id, vipFlags):
ids = id.split("|||")
if ids[1]: self.phend['User-Agent'] = b64decode(ids[1]).decode('utf-8')
url = b64decode(ids[0]).decode('utf-8')
if not re.search(r'\.m3u8|\.mp4', url):
a = url.split("@@")
body = f"parse_api={a[0]}&url={quote(self.aes('encrypt', a[1]))}&token={ids[-1]}"
jd = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
url = json.loads(jd)['url']
# if '.mp4' not in url:
# l=self.fetch(url, headers=self.phend,allow_redirects=False)
# if l.status_code == 200 and l.headers.get('Location',''):
# url=l.headers['Location']
if '.jpg' in url or '.png' in url or '.jpeg' in url:
url = self.getProxyUrl() + "&url=" + b64encode(url.encode('utf-8')).decode('utf-8') + "&type=m3u8"
result = {}
result["parse"] = 0
result["url"] = url
result["header"] = self.phend
return result
def localProxy(self, param):
url = b64decode(param["url"]).decode('utf-8')
durl = url[:url.rfind('/')]
data = self.fetch(url, headers=self.phend).content.decode("utf-8")
inde = None
pd = True
lines = data.strip().split('\n')
for index, string in enumerate(lines):
# if '#EXT-X-DISCONTINUITY' in string and pd:
# pd = False
# inde = index
if '#EXT' not in string and 'http' not in string:
lines[index] = durl + ('' if string.startswith('/') else '/') + string
if inde:
del lines[inde:inde + 4]
data = '\n'.join(lines)
return [200, "application/vnd.apple.mpegur", data]
def gethost(self):
host = self.fetch('https://jingyu-1312635929.cos.ap-nanjing.myqcloud.com/1.json').text.strip()
return host
def aes(self, operation, text):
key = "4d83b87c4c5ea111".encode("utf-8")
iv = key
if operation == "encrypt":
cipher = AES.new(key, AES.MODE_CBC, iv)
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
ct = b64encode(ct_bytes).decode("utf-8")
return ct
elif operation == "decrypt":
cipher = AES.new(key, AES.MODE_CBC, iv)
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
return pt.decode("utf-8")
def header(self):
t = str(int(time.time()))
md5_hash = MD5.new()
md5_hash.update(t.encode('utf-8'))
signature_md5 = md5_hash.hexdigest()
header = {"User-Agent": "okhttp/3.14.9", "app-version-code": "300", "app-ui-mode": "light",
"app-user-device-id": signature_md5, "app-api-verify-time": t,
"app-api-verify-sign": self.aes("encrypt", t), "Content-Type": "application/x-www-form-urlencoded"}
return header
def getdata(self, path, data=None):
# data = self.post(self.host + path, headers=self.header(), data=data).text
data = self.post(self.host + path, headers=self.header(), data=data, verify=False).json()["data"]
data1 = self.aes("decrypt", data)
return json.loads(data1)