Add files via upload

This commit is contained in:
alantang 2025-02-19 12:37:57 +08:00 committed by GitHub
parent cc564afdad
commit cbfe32ccbf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 2280 additions and 3 deletions

213
py/lav.py Normal file
View File

@ -0,0 +1,213 @@
# coding=utf-8
# !/usr/bin/python
# 嗷呜
import sys
from base64 import b64encode, b64decode
from Crypto.Hash import MD5, SHA256
sys.path.append("..")
from base.spider import Spider
from Crypto.Cipher import AES
import json
import time
class Spider(Spider):
def getName(self):
return "lav"
def init(self, extend=""):
self.id = self.ms(str(int(time.time() * 1000)))[:16]
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
host = "http://sir_new.tiansexyl.tv"
t = str(int(time.time() * 1000))
headers = {'User-Agent': 'okhttp-okgo/jeasonlzy', 'Connection': 'Keep-Alive',
'Content-Type': 'application/x-www-form-urlencoded'}
def homeContent(self, filter):
cateManual = {"演员": "actor", "分类": "avsearch", }
classes = []
for k in cateManual:
classes.append({'type_name': k, 'type_id': cateManual[k]})
j = {'code': 'homePage', 'mod': 'down', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id}
body = self.aes(j)
data = self.post(f'{self.host}/api.php?t={str(int(time.time() * 1000))}', data=body, headers=self.headers).json()['data']
data1 = self.aes(data, False)['data']
self.r = data1['r']
for i, d in enumerate(data1['avTag']):
# if i == 4:
# break
classes.append({'type_name': d['name'], 'type_id': d['tag']})
resutl = {}
resutl["class"] = classes
return resutl
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
id = tid.split("@@")
result = {}
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
if id[0] == 'avsearch':
if pg == '1':
j = {'code': 'avsearch', 'mod': 'search', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id}
if len(id) > 1:
j = {'code': 'find', 'mod': 'tag', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id, 'type': 'av', 'dis': 'new', 'page': str(pg), 'tag': id[1]}
elif id[0] == 'actor':
j = {'mod': 'actor', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv', 'app_type': 'rn',
'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn', 'oauth_id': self.id,
'page': str(pg), 'filter': ''}
if len(id) > 1:
j = {'code': 'eq', 'mod': 'actor', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id, 'page': str(pg), 'id': id[1], 'actor': id[2]}
else:
j = {'code': 'search', 'mod': 'av', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id, 'page': str(pg), 'tag': id[0]}
body = self.aes(j)
data = self.post(f'{self.host}/api.php?t={str(int(time.time() * 1000))}', data=body, headers=self.headers).json()['data']
data1 = self.aes(data, False)['data']
videos = []
if tid == 'avsearch' and len(id) == 1:
for item in data1:
videos.append({"vod_id": id[0] + "@@" + str(item.get('tags')), 'vod_name': item.get('name'),
'vod_pic': self.imgs(item.get('ico')), 'vod_tag': 'folder',
'style': {"type": "rect", "ratio": 1.33}})
elif tid == 'actor' and len(id) == 1:
for item in data1:
videos.append({"vod_id": id[0] + "@@" + str(item.get('id')) + "@@" + item.get('name'),
'vod_name': item.get('name'), 'vod_pic': self.imgs(item.get('cover')),
'vod_tag': 'folder', 'style': {"type": "oval"}})
else:
for item in data1:
if item.get('_id'):
videos.append({"vod_id": str(item.get('id')), 'vod_name': item.get('title'),
'vod_pic': self.imgs(item.get('cover_thumb') or item.get('cover_full')),
'vod_remarks': item.get('good'), 'style': {"type": "rect", "ratio": 1.33}})
result["list"] = videos
return result
def detailContent(self, ids):
id = ids[0]
j = {'code': 'detail', 'mod': 'av', 'channel': 'self', 'via': 'agent', 'bundleId': 'com.tvlutv',
'app_type': 'rn', 'os_version': '12.0.5', 'version': '3.2.3', 'oauth_type': 'android_rn',
'oauth_id': self.id, 'id': id}
body = self.aes(j)
data = self.post(f'{self.host}/api.php?t={str(int(time.time() * 1000))}', data=body, headers=self.headers).json()['data']
data1 = self.aes(data, False)['line']
vod = {}
play = []
for itt in data1:
a = itt['line'].get('s720')
if a:
b = a.split('.')
b[0] = 'https://m3u8'
a = '.'.join(b)
play.append(itt['info']['tips'] + "$" + a)
break
vod["vod_play_from"] = 'LAV'
vod["vod_play_url"] = "#".join(play)
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg="1"):
pass
def playerContent(self, flag, id, vipFlags):
url = self.getProxyUrl() + "&url=" + b64encode(id.encode('utf-8')).decode('utf-8') + "&type=m3u8"
self.hh = {'User-Agent': 'dd', 'Connection': 'Keep-Alive', 'Referer': self.r}
result = {}
result["parse"] = 0
result["url"] = url
result["header"] = self.hh
return result
def localProxy(self, param):
url = param["url"]
if param.get('type') == "m3u8":
return self.vod(b64decode(url).decode('utf-8'))
else:
return self.img(url)
def vod(self, url):
data = self.fetch(url, headers=self.hh).text
key = bytes.fromhex("13d47399bda541b85e55830528d4e66f1791585b2d2216f23215c4c63ebace31")
iv = bytes.fromhex(data[:32])
data = data[32:]
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
data_bytes = bytes.fromhex(data)
decrypted = cipher.decrypt(data_bytes)
encoded = decrypted.decode("utf-8").replace("\x08", "")
return [200, "application/vnd.apple.mpegur", encoded]
def imgs(self, url):
return self.getProxyUrl() + '&url=' + url
def img(self, url):
type = url.split('.')[-1]
data = self.fetch(url).text
key = bytes.fromhex("ba78f184208d775e1553550f2037f4af22cdcf1d263a65b4d5c74536f084a4b2")
iv = bytes.fromhex(data[:32])
data = data[32:]
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
data_bytes = bytes.fromhex(data)
decrypted = cipher.decrypt(data_bytes)
return [200, f"image/{type}", decrypted]
def ms(self, data, m=False):
h = MD5.new()
if m:
h = SHA256.new()
h.update(data.encode('utf-8'))
return h.hexdigest()
def aes(self, data, operation=True):
key = bytes.fromhex("620f15cfdb5c79c34b3940537b21eda072e22f5d7151456dec3932d7a2b22c53")
t = str(int(time.time()))
ivt = self.ms(t)
if operation:
data = json.dumps(data, separators=(',', ':'))
iv = bytes.fromhex(ivt)
else:
iv = bytes.fromhex(data[:32])
data = data[32:]
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
if operation:
data_bytes = data.encode('utf-8')
encrypted = cipher.encrypt(data_bytes)
ep = f'{ivt}{encrypted.hex()}'
edata = f"data={ep}&timestamp={t}0d27dfacef1338483561a46b246bf36d"
sign = self.ms(self.ms(edata, True))
edata = f"timestamp={t}&data={ep}&sign={sign}"
return edata
else:
data_bytes = bytes.fromhex(data)
decrypted = cipher.decrypt(data_bytes)
return json.loads(decrypted.decode('utf-8'))

266
py/py_Xhm.py Normal file
View File

@ -0,0 +1,266 @@
# coding=utf-8
# !/usr/bin/python
# by嗷呜
import json
import sys
from base64 import b64decode, b64encode
from pyquery import PyQuery as pq
from requests import Session
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.host = self.gethost()
self.headers['referer'] = f'{self.host}/'
self.session = Session()
self.session.headers.update(self.headers)
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'sec-ch-ua': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-full-version': '"133.0.6943.98"',
'sec-ch-ua-arch': '"x86"',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua-platform-version': '"19.0.0"',
'sec-ch-ua-model': '""',
'sec-ch-ua-full-version-list': '"Not(A:Brand";v="99.0.0.0", "Google Chrome";v="133.0.6943.98", "Chromium";v="133.0.6943.98"',
'dnt': '1',
'upgrade-insecure-requests': '1',
'sec-fetch-site': 'none',
'sec-fetch-mode': 'navigate',
'sec-fetch-user': '?1',
'sec-fetch-dest': 'document',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'priority': 'u=0, i'
}
def homeContent(self, filter):
result = {}
cateManual = {
"4K": "/4k",
"国产": "two_click_/categories/chinese",
"最新": "/newest",
"最佳": "/best",
"频道": "/channels",
"类别": "/categories",
"明星": "/pornstars"
}
classes = []
filters = {}
for k in cateManual:
classes.append({
'type_name': k,
'type_id': cateManual[k]
})
if k !='4K':filters[cateManual[k]]=[{'key':'type','name':'类型','value':[{'n':'4K','v':'/4k'}]}]
result['class'] = classes
result['filters'] = filters
return result
def homeVideoContent(self):
data = self.getpq()
return {'list': self.getlist(data(".thumb-list--sidebar .thumb-list__item"))}
def categoryContent(self, tid, pg, filter, extend):
vdata = []
result = {}
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
if tid in ['/4k', '/newest', '/best'] or 'two_click_' in tid:
if 'two_click_' in tid: tid = tid.split('click_')[-1]
data = self.getpq(f'{tid}{extend.get("type","")}/{pg}')
vdata = self.getlist(data(".thumb-list--sidebar .thumb-list__item"))
elif tid == '/channels':
data = self.getpq(f'{tid}/{pg}')
jsdata = self.getjsdata(data)
for i in jsdata['channels']:
vdata.append({
'vod_id': f"two_click_" + i.get('channelURL'),
'vod_name': i.get('channelName'),
'vod_pic': i.get('siteLogoURL'),
'vod_year': f'videos:{i.get("videoCount")}',
'vod_tag': 'folder',
'vod_remarks': f'subscribers:{i["subscriptionModel"].get("subscribers")}',
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif tid == '/categories':
result['pagecount'] = pg
data = self.getpq(tid)
self.cdata = self.getjsdata(data)
for i in self.cdata['layoutPage']['store']['popular']['assignable']:
vdata.append({
'vod_id': "one_click_" + i.get('id'),
'vod_name': i.get('name'),
'vod_pic': '',
'vod_tag': 'folder',
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif tid == '/pornstars':
data = self.getpq(f'{tid}/{pg}')
pdata = self.getjsdata(data)
for i in pdata['pagesPornstarsComponent']['pornstarListProps']['pornstars']:
vdata.append({
'vod_id': f"two_click_" + i.get('pageURL'),
'vod_name': i.get('name'),
'vod_pic': i.get('imageThumbUrl'),
'vod_remarks': i.get('translatedCountryName'),
'vod_tag': 'folder',
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif 'one_click' in tid:
result['pagecount'] = pg
tid = tid.split('click_')[-1]
for i in self.cdata['layoutPage']['store']['popular']['assignable']:
if i.get('id') == tid:
for j in i['items']:
vdata.append({
'vod_id': f"two_click_" + j.get('url'),
'vod_name': j.get('name'),
'vod_pic': j.get('thumb'),
'vod_tag': 'folder',
'style': {'ratio': 1.33, 'type': 'rect'}
})
result['list'] = vdata
return result
def detailContent(self, ids):
data = self.getpq(ids[0])
djs = self.getjsdata(data)
vn = data('meta[property="og:title"]').attr('content')
dtext = data('#video-tags-list-container')
href = dtext('a').attr('href')
title = dtext('span[class*="body-bold-"]').eq(0).text()
pdtitle = ''
if href:
pdtitle = '[a=cr:' + json.dumps({'id': 'two_click_' + href, 'name': title}) + '/]' + title + '[/a]'
vod = {
'vod_name': vn,
'vod_director': pdtitle,
'vod_remarks': data('.rb-new__info').text(),
'vod_play_from': 'Xhamster',
'vod_play_url': ''
}
try:
plist = []
d = djs['xplayerSettings']['sources']
f = d.get('standard')
ah=[]
if d.get('hls'):
for format_type, info in d['hls'].items():
if url := info.get('url'):
encoded = self.e64(f'{0}@@@@{url}')
plist.append(f"{format_type}${encoded}")
if f:
for key, value in f.items():
if isinstance(value, list):
ah.extend(value)
if len(ah):
for info in ah:
id = self.e64(f'{0}@@@@{info.get("url") or info.get("fallback")}')
plist.append(f"{info.get('label') or info.get('quality')}${id}")
except Exception as e:
plist = [f"{vn}${self.e64(f'{1}@@@@{ids[0]}')}"]
print(f"获取视频信息失败: {str(e)}")
vod['vod_play_url'] = '#'.join(plist)
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
data = self.getpq(f'/search/{key}?page={pg}')
return {'list': self.getlist(data(".thumb-list--sidebar .thumb-list__item")), 'page': pg}
def playerContent(self, flag, id, vipFlags):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5410.0 Safari/537.36',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'dnt': '1',
'sec-ch-ua-mobile': '?0',
'origin': self.host,
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'referer': f'{self.host}/',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'priority': 'u=1, i',
}
ids = self.d64(id).split('@@@@')
return {'parse': int(ids[0]), 'url': ids[1], 'header': headers}
def localProxy(self, param):
pass
def gethost(self):
try:
response = self.fetch('https://xhamster.com', headers=self.headers, allow_redirects=False)
return response.headers['Location']
except Exception as e:
print(f"获取主页失败: {str(e)}")
return "https://zn.xhamster.com"
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def getlist(self, data):
vlist = []
for i in data.items():
vlist.append({
'vod_id': i('.role-pop').attr('href'),
'vod_name': i('.video-thumb-info a').text(),
'vod_pic': i('.role-pop img').attr('src'),
'vod_year': i('.video-thumb-info .video-thumb-views').text().split(' ')[0],
'vod_remarks': i('.role-pop div[data-role="video-duration"]').text(),
'style': {'ratio': 1.33, 'type': 'rect'}
})
return vlist
def getpq(self, path=''):
h = '' if path.startswith('http') else self.host
response = self.session.get(f'{h}{path}').text
try:
return pq(response)
except Exception as e:
print(f"{str(e)}")
return pq(response.encode('utf-8'))
def getjsdata(self, data):
vhtml = data("script[id='initials-script']").text()
jst = json.loads(vhtml.split('initials=')[-1][:-1])
return jst

298
py/py_emby_proxy.py Normal file
View File

@ -0,0 +1,298 @@
#coding=utf-8
#!/usr/bin/python
import sys
import json
import time
import requests
from uuid import uuid4
from urllib.parse import quote
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "EMBY"
def init(self, extend):
try:
extendDict = json.loads(extend)
self.baseUrl = extendDict['server'].strip('/')
self.username = extendDict['username']
self.password = extendDict['password']
self.proxy = extendDict['proxy']
self.thread = extendDict['thread'] if 'thread' in extendDict else 0
except:
self.baseUrl = ''
self.username = ''
self.password = ''
self.proxy = ''
self.thread = 0
def destroy(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
try:
embyInfos = self.getAccessToken()
except:
return {'msg': '获取Emby服务器信息出错'}
header = self.header.copy()
header['Content-Type'] = "application/json; charset=UTF-8"
url = f"{self.baseUrl}/emby/Users/{embyInfos['User']['Id']}/Views"
params = {
"X-Emby-Client": embyInfos['SessionInfo']['Client'],
"X-Emby-Device-Name": embyInfos['SessionInfo']['DeviceName'],
"X-Emby-Device-Id": embyInfos['SessionInfo']['DeviceId'],
"X-Emby-Client-Version": embyInfos['SessionInfo']['ApplicationVersion'],
"X-Emby-Token": embyInfos['AccessToken']
}
r = requests.get(url, params=params, headers=header, timeout=120, proxies={"http": self.proxy, "https": self.proxy})
typeInfos = r.json()["Items"]
classList = []
for typeInfo in typeInfos:
if "播放列表" in typeInfo['Name'] or '相机' in typeInfo['Name']:
continue
classList.append({"type_name": typeInfo['Name'], "type_id": typeInfo['Id']})
result = {'class': classList}
return result
def homeVideoContent(self):
return {}
def categoryContent(self, cid, page, filter, ext):
try:
embyInfos = self.getAccessToken()
except:
return {'list': [], 'msg': '获取Emby服务器信息出错'}
result = {}
page = int(page)
header = self.header.copy()
header['Content-Type'] = "application/json; charset=UTF-8"
url = f"{self.baseUrl}/emby/Users/{embyInfos['User']['Id']}/Items"
params = {
"X-Emby-Client": embyInfos['SessionInfo']['Client'],
"X-Emby-Device-Name": embyInfos['SessionInfo']['DeviceName'],
"X-Emby-Device-Id": embyInfos['SessionInfo']['DeviceId'],
"X-Emby-Client-Version": embyInfos['SessionInfo']['ApplicationVersion'],
"X-Emby-Token": embyInfos['AccessToken'],
"SortBy": "DateLastContentAdded,SortName",
"IncludeItemTypes": "Movie,Series",
"SortOrder": "Descending",
"ParentId": cid,
"Recursive": "true",
"Limit": "30",
"ImageTypeLimit": 1,
"StartIndex": str((page - 1) * 30),
"EnableImageTypes": "Primary,Backdrop,Thumb,Banner",
"Fields": "BasicSyncInfo,CanDelete,Container,PrimaryImageAspectRatio,ProductionYear,CommunityRating,Status,CriticRating,EndDate,Path",
"EnableUserData": "true"
}
r = requests.get(url, params=params, headers=header, timeout=120, proxies={"http": self.proxy, "https": self.proxy})
videoList = r.json()['Items']
videos = []
for video in videoList:
name = self.cleanText(video['Name'])
videos.append({
"vod_id": video['Id'],
"vod_name": name,
"vod_pic": f"{self.baseUrl}/emby/Items/{video['Id']}/Images/Primary?maxWidth=400&tag={video['ImageTags']['Primary']}&quality=90" if 'Primary' in video['ImageTags'] else '',
"vod_remarks": video['ProductionYear'] if 'ProductionYear' in video else ''
})
result['list'] = videos
result['page'] = page
result['pagecount'] = page + 1 if page * 30 < int(r.json()['TotalRecordCount']) else page
result['limit'] = len(videos)
result['total'] = int(r.json()['TotalRecordCount']) if "TotalRecordCount" in r.json() else 0
return result
def detailContent(self, did):
try:
embyInfos = self.getAccessToken()
except:
return {'list': [], 'msg': '获取Emby服务器信息出错'}
header = self.header.copy()
header['Content-Type'] = "application/json; charset=UTF-8"
url = f"{self.baseUrl}/emby/Users/{embyInfos['User']['Id']}/Items/{did[0]}"
params = {
"X-Emby-Client": embyInfos['SessionInfo']['Client'],
"X-Emby-Device-Name": embyInfos['SessionInfo']['DeviceName'],
"X-Emby-Device-Id": embyInfos['SessionInfo']['DeviceId'],
"X-Emby-Client-Version": embyInfos['SessionInfo']['ApplicationVersion'],
"X-Emby-Token": embyInfos['AccessToken']
}
r = requests.get(url, params=params, headers=header, timeout=120, proxies={"http": self.proxy, "https": self.proxy})
videoInfos = r.json()
vod = {
"vod_id": did[0],
"vod_name": videoInfos['Name'],
"vod_pic": f'{self.baseUrl}/emby/Items/{did[0]}/Images/Primary?maxWidth=400&tag={videoInfos["ImageTags"]["Primary"]}&quality=90' if 'Primary' in videoInfos['ImageTags'] else '',
"type_name": videoInfos['Genres'][0] if len(videoInfos['Genres']) > 0 else '',
"vod_year": videoInfos['ProductionYear'] if 'ProductionYear' in videoInfos else '',
"vod_content": videoInfos['Overview'].replace('\xa0', ' ').replace('\n\n', '\n').strip() if 'Overview' in videoInfos else '',
"vod_play_from": "EMBY"
}
playUrl = ''
if not videoInfos['IsFolder']:
playUrl += f"{videoInfos['Name'].strip()}${videoInfos['Id']}#"
else:
url = f"{self.baseUrl}/emby/Shows/{did[0]}/Seasons"
params.update(
{
"UserId": embyInfos['User']['Id'],
"EnableImages": "true",
"Fields": "BasicSyncInfo,CanDelete,Container,PrimaryImageAspectRatio,ProductionYear,CommunityRating",
"EnableUserData": "true",
"EnableTotalRecordCount": "false"
}
)
r = requests.get(url, params=params, headers=header, timeout=120, proxies={"http": self.proxy, "https": self.proxy})
if r.status_code == 200:
playInfos = r.json()['Items']
for playInfo in playInfos:
url = f"{self.baseUrl}/emby/Shows/{playInfo['Id']}/Episodes"
params.update(
{
"SeasonId": playInfo['Id'],
"Fields": "BasicSyncInfo,CanDelete,CommunityRating,PrimaryImageAspectRatio,ProductionYear,Overview"
}
)
r = requests.get(url, params=params, headers=header, timeout=120, proxies={"http": self.proxy, "https": self.proxy})
videoList = r.json()['Items']
for video in videoList:
playUrl += f"{playInfo['Name'].replace('#', '-').replace('$', '|').strip()}|{video['Name'].strip()}${video['Id']}#"
else:
url = f"{self.baseUrl}/emby/Users/{embyInfos['User']['Id']}/Items"
params = {
"ParentId": did[0],
"Fields": "BasicSyncInfo,CanDelete,Container,PrimaryImageAspectRatio,ProductionYear,CommunityRating,CriticRating",
"ImageTypeLimit": "1",
"StartIndex": "0",
"EnableUserData": "true",
"X-Emby-Client": embyInfos['SessionInfo']['Client'],
"X-Emby-Device-Name": embyInfos['SessionInfo']['DeviceName'],
"X-Emby-Device-Id": embyInfos['SessionInfo']['DeviceId'],
"X-Emby-Client-Version": embyInfos['SessionInfo']['ApplicationVersion'],
"X-Emby-Token": embyInfos['AccessToken']
}
r = requests.get(url, params=params, headers=header, timeout=120, proxies={"http": self.proxy, "https": self.proxy})
videoList = r.json()['Items']
for video in videoList:
playUrl += f"{video['Name'].replace('#', '-').replace('$', '|').strip()}${video['Id']}#"
vod['vod_play_url'] = playUrl.strip('#')
result = {'list': [vod]}
return result
def searchContent(self, key, quick, pg="1"):
return self.searchContentPage(key, quick, pg)
def searchContentPage(self, keywords, quick, page):
try:
embyInfos = self.getAccessToken()
except:
return {'list': [], 'msg': '获取Emby服务器信息出错'}
page = int(page)
header = self.header.copy()
header['Content-Type'] = "application/json; charset=UTF-8"
url = f"{self.baseUrl}/emby/Users/{embyInfos['User']['Id']}/Items"
params = {
"X-Emby-Client": embyInfos['SessionInfo']['Client'],
"X-Emby-Device-Name": embyInfos['SessionInfo']['DeviceName'],
"X-Emby-Device-Id": embyInfos['SessionInfo']['DeviceId'],
"X-Emby-Client-Version": embyInfos['SessionInfo']['ApplicationVersion'],
"X-Emby-Token": embyInfos['AccessToken'],
"SortBy": "SortName",
"SortOrder": "Ascending",
"Fields": "BasicSyncInfo,CanDelete,Container,PrimaryImageAspectRatio,ProductionYear,Status,EndDate",
"StartIndex": str(((page-1)*50)),
"EnableImageTypes": "Primary,Backdrop,Thumb",
"ImageTypeLimit": "1",
"Recursive": "true",
"SearchTerm": keywords,
"IncludeItemTypes": "Movie,Series,BoxSet",
"GroupProgramsBySeries": "true",
"Limit": "50",
"EnableTotalRecordCount": "true"
}
r = requests.get(url, params=params, headers=header, timeout=120, proxies={"http": self.proxy, "https": self.proxy})
videos = []
vodList = r.json()['Items']
for vod in vodList:
sid = vod['Id']
name = self.cleanText(vod['Name'])
pic = f'{self.baseUrl}/emby/Items/{sid}/Images/Primary?maxWidth=400&tag={vod["ImageTags"]["Primary"]}&quality=90' if 'Primary' in vod["ImageTags"] else ''
videos.append({
"vod_id": sid,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": vod['ProductionYear'] if 'ProductionYear' in vod else ''
})
result = {'list': videos}
return result
def playerContent(self, flag, pid, vipFlags):
try:
embyInfos = self.getAccessToken()
except:
return {'list': [], 'msg': '获取Emby服务器信息出错'}
header = self.header.copy()
header['Content-Type'] = "application/json; charset=UTF-8"
url = f"{self.baseUrl}/emby/Items/{pid}/PlaybackInfo"
params = {
"UserId": embyInfos['User']['Id'],
"IsPlayback": "false",
"AutoOpenLiveStream": "false",
"StartTimeTicks": 0,
"MaxStreamingBitrate": "2147483647",
"X-Emby-Client": embyInfos['SessionInfo']['Client'],
"X-Emby-Device-Name": embyInfos['SessionInfo']['DeviceName'],
"X-Emby-Device-Id": embyInfos['SessionInfo']['DeviceId'],
"X-Emby-Client-Version": embyInfos['SessionInfo']['ApplicationVersion'],
"X-Emby-Token": embyInfos['AccessToken']
}
data = "{\"DeviceProfile\":{\"SubtitleProfiles\":[{\"Method\":\"Embed\",\"Format\":\"ass\"},{\"Format\":\"ssa\",\"Method\":\"Embed\"},{\"Format\":\"subrip\",\"Method\":\"Embed\"},{\"Format\":\"sub\",\"Method\":\"Embed\"},{\"Method\":\"Embed\",\"Format\":\"pgssub\"},{\"Format\":\"subrip\",\"Method\":\"External\"},{\"Method\":\"External\",\"Format\":\"sub\"},{\"Method\":\"External\",\"Format\":\"ass\"},{\"Format\":\"ssa\",\"Method\":\"External\"},{\"Method\":\"External\",\"Format\":\"vtt\"},{\"Method\":\"External\",\"Format\":\"ass\"},{\"Format\":\"ssa\",\"Method\":\"External\"}],\"CodecProfiles\":[{\"Codec\":\"h264\",\"Type\":\"Video\",\"ApplyConditions\":[{\"Property\":\"IsAnamorphic\",\"Value\":\"true\",\"Condition\":\"NotEquals\",\"IsRequired\":false},{\"IsRequired\":false,\"Value\":\"high|main|baseline|constrained baseline\",\"Condition\":\"EqualsAny\",\"Property\":\"VideoProfile\"},{\"IsRequired\":false,\"Value\":\"80\",\"Condition\":\"LessThanEqual\",\"Property\":\"VideoLevel\"},{\"IsRequired\":false,\"Value\":\"true\",\"Condition\":\"NotEquals\",\"Property\":\"IsInterlaced\"}]},{\"Codec\":\"hevc\",\"ApplyConditions\":[{\"Property\":\"IsAnamorphic\",\"Value\":\"true\",\"Condition\":\"NotEquals\",\"IsRequired\":false},{\"IsRequired\":false,\"Value\":\"high|main|main 10\",\"Condition\":\"EqualsAny\",\"Property\":\"VideoProfile\"},{\"Property\":\"VideoLevel\",\"Value\":\"175\",\"Condition\":\"LessThanEqual\",\"IsRequired\":false},{\"IsRequired\":false,\"Value\":\"true\",\"Condition\":\"NotEquals\",\"Property\":\"IsInterlaced\"}],\"Type\":\"Video\"}],\"MaxStreamingBitrate\":40000000,\"TranscodingProfiles\":[{\"Container\":\"ts\",\"AudioCodec\":\"aac,mp3,wav,ac3,eac3,flac,opus\",\"VideoCodec\":\"hevc,h264,mpeg4\",\"BreakOnNonKeyFrames\":true,\"Type\":\"Video\",\"MaxAudioChannels\":\"6\",\"Protocol\":\"hls\",\"Context\":\"Streaming\",\"MinSegments\":2}],\"DirectPlayProfiles\":[{\"Container\":\"mov,mp4,mkv,hls,webm\",\"Type\":\"Video\",\"VideoCodec\":\"h264,hevc,dvhe,dvh1,h264,hevc,hev1,mpeg4,vp9\",\"AudioCodec\":\"aac,mp3,wav,ac3,eac3,flac,truehd,dts,dca,opus,pcm,pcm_s24le\"}],\"ResponseProfiles\":[{\"MimeType\":\"video/mp4\",\"Type\":\"Video\",\"Container\":\"m4v\"}],\"ContainerProfiles\":[],\"MusicStreamingTranscodingBitrate\":40000000,\"MaxStaticBitrate\":40000000}}"
r = requests.post(url, params=params, data=data, headers=header, timeout=120, proxies={"http": self.proxy, "https": self.proxy})
url = self.baseUrl + r.json()['MediaSources'][0]['DirectStreamUrl']
if int(self.thread) > 0:
try:
self.fetch('http://127.0.0.1:7777', timeout=120)
except:
self.fetch('http://127.0.0.1:9978/go')
url = f'http://127.0.0.1:7777/?url={quote(url)}&thread={self.thread}'
result = {
"url": url,
"header": self.header,
"parse": 0
}
return result
def localProxy(self, params):
pass
def getAccessToken(self):
key = f"emby_{self.baseUrl}_{self.username}_{self.password }"
embyInfos = self.getCache(key)
if embyInfos:
return embyInfos
header = self.header.copy()
header['Content-Type'] = "application/json; charset=UTF-8"
# r = requests.post(f"{self.baseUrl}/emby/Users/AuthenticateByName", params={"Username": self.username, "Password": self.password , "Pw": self.password , "X-Emby-Client": "Yamby", "X-Emby-Device-Name": "Yamby", "X-Emby-Device-Id": str(uuid4()), "X-Emby-Client-Version": "1.0.2"}, headers=header, timeout=120, proxies={"http": self.proxy, "https": self.proxy}
r = requests.post(f"{self.baseUrl}/emby/Users/AuthenticateByName", params={"Username": self.username, "Password": self.password , "Pw": self.password , "X-Emby-Client": "Emby Theater", "X-Emby-Device-Name": "DESKTOP-C25AFR6", "X-Emby-Device-Id": str(uuid4()), "X-Emby-Client-Version": "3.0.20-3.0"}, headers=header, timeout=120, proxies={"http": self.proxy, "https": self.proxy})
embyInfos = r.json()
self.setCache(key, embyInfos)
return embyInfos
#header = {"User-Agent": "Yamby/1.0.2(Android"}
header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) EmbyTheater/3.0.20-3.0 Chrome/100.0.4896.160 Electron/18.3.15 Safari/537.36"}

231
py/py_phu.py Normal file
View File

@ -0,0 +1,231 @@
# coding=utf-8
# !/usr/bin/python
# by嗷呜
import json
import re
import sys
from pyquery import PyQuery as pq
from base64 import b64decode, b64encode
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
host = "https://www.pornhub.com"
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5410.0 Safari/537.36',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'dnt': '1',
'sec-ch-ua-mobile': '?0',
'origin': host,
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'referer': f'{host}/',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'priority': 'u=1, i',
}
def homeContent(self, filter):
result = {}
cateManual = {
"视频": "/video",
"片单": "/playlists",
"频道": "/channels",
"分类": "/categories",
"明星": "/pornstars"
}
classes = []
filters = {}
for k in cateManual:
classes.append({
'type_name': k,
'type_id': cateManual[k]
})
result['class'] = classes
result['filters'] = filters
return result
def homeVideoContent(self):
data = self.getpq('/recommended')
vhtml = data("#recommendedListings .pcVideoListItem .phimage")
return {'list':self.getlist(vhtml)}
def categoryContent(self, tid, pg, filter, extend):
vdata = []
result = {}
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
if tid=='/video' or '_this_video' in tid:
pagestr = f'&' if '?' in tid else f'?'
tid=tid.split('_this_video')[0]
data=self.getpq(f'{tid}{pagestr}page={pg}')
vdata=self.getlist(data('#videoCategory .pcVideoListItem'))
elif tid == '/playlists':
data=self.getpq(f'{tid}?page={pg}')
vhtml=data('#playListSection li')
vdata = []
for i in vhtml.items():
vdata.append({
'vod_id': 'playlists_click_' + i('.thumbnail-info-wrapper .display-block a').attr('href'),
'vod_name': i('.thumbnail-info-wrapper .display-block a').attr('title'),
'vod_pic': i('.largeThumb').attr('src'),
'vod_tag': 'folder',
'vod_remarks': i('.playlist-videos .number').text(),
'style': {"type": "rect", "ratio": 1.33}
})
elif tid=='/channels':
data=self.getpq(f'{tid}?o=rk&page={pg}')
vhtml=data('#filterChannelsSection li .description')
vdata=[]
for i in vhtml.items():
vdata.append({
'vod_id': 'director_click_'+i('.avatar a').attr('href'),
'vod_name': i('.avatar img').attr('alt'),
'vod_pic': i('.avatar img').attr('src'),
'vod_tag':'folder',
'vod_remarks': i('.descriptionContainer ul li').eq(-1).text(),
'style':{"type": "rect", "ratio": 1.33}
})
elif tid=='/categories' and pg=='1':
result['pagecount'] = 1
data=self.getpq(f'{tid}')
vhtml=data('.categoriesListSection li .relativeWrapper')
vdata=[]
for i in vhtml.items():
vdata.append({
'vod_id': i('a').attr('href')+'_this_video',
'vod_name': i('a').attr('alt'),
'vod_pic': i('a img').attr('src'),
'vod_tag':'folder',
'style':{"type": "rect", "ratio": 1.33}
})
elif tid=='/pornstars':
data=self.getpq(f'{tid}?o=t&page={pg}')
vhtml=data('#popularPornstars .performerCard .wrap')
vdata=[]
for i in vhtml.items():
vdata.append({
'vod_id': 'pornstars_click_'+i('a').attr('href'),
'vod_name': i('.performerCardName').text(),
'vod_pic': i('a img').attr('src'),
'vod_tag':'folder',
'vod_year':i('.performerVideosViewsCount span').eq(0).text(),
'vod_remarks': i('.performerVideosViewsCount span').eq(-1).text(),
'style':{"type": "rect", "ratio": 1.33}
})
elif 'playlists_click' in tid:
tid=tid.split('click_')[-1]
if pg=='1':
hdata=self.getpq(tid)
self.token=hdata('.searchInput').attr('data-token')
tid=tid.split('playlist/')[-1]
data=self.getpq(f'/playlist/viewChunked?id={tid}&token={self.token}&page={pg}')
vdata=self.getlist(data('.pcVideoListItem .phimage'))
elif 'director_click' in tid:
tid=tid.split('click_')[-1]
data=self.getpq(f'{tid}/videos?page={pg}')
vdata=self.getlist(data('#showAllChanelVideos .pcVideoListItem .phimage'))
elif 'pornstars_click' in tid:
tid=tid.split('click_')[-1]
data=self.getpq(f'{tid}/videos?page={pg}')
vdata=self.getlist(data('#mostRecentVideosSection .pcVideoListItem .phimage'))
result['list'] = vdata
return result
def detailContent(self, ids):
url = f"{self.host}{ids[0]}"
data = self.getpq(ids[0])
vn=data('meta[property="og:title"]').attr('content')
dtext=data('.userInfo .usernameWrap a')
pdtitle = '[a=cr:' + json.dumps({'id': 'director_click_'+dtext.attr('href'), 'name': dtext.text()}) + '/]' + dtext.text() + '[/a]'
vod = {
'vod_name': vn,
'vod_director':pdtitle,
'vod_remarks': (data('.userInfo').text()+' / '+data('.ratingInfo').text()).replace('\n',' / '),
'vod_play_from': 'Pornhub',
'vod_play_url': ''
}
js_content = data("#player script").eq(0).text()
plist = [f"{vn}${self.e64(f'{1}@@@@{url}')}"]
try:
pattern = r'"mediaDefinitions":\s*(\[.*?\]),\s*"isVertical"'
match = re.search(pattern, js_content, re.DOTALL)
if match:
json_str = match.group(1)
udata = json.loads(json_str)
plist = [
f"{media['height']}${self.e64(f'{0}@@@@{url}')}"
for media in udata[:-1]
if (url := media.get('videoUrl'))
]
except Exception as e:
print(f"提取mediaDefinitions失败: {str(e)}")
vod['vod_play_url'] = '#'.join(plist)
return {'list':[vod]}
def searchContent(self, key, quick, pg="1"):
data=self.getpq(f'/video/search?search={key}&page={pg}')
return {'list':self.getlist(data('#videoSearchResult .pcVideoListItem .phimage'))}
def playerContent(self, flag, id, vipFlags):
ids=self.d64(id).split('@@@@')
return {'parse': int(ids[0]), 'url': ids[1], 'header': self.headers}
def localProxy(self, param):
pass
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self,encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def getlist(self, data):
vlist=[]
for i in data.items():
vlist.append({
'vod_id': i('a').attr('href'),
'vod_name': i('a').attr('title'),
'vod_pic': i('img').attr('src'),
'vod_remarks': i('.bgShadeEffect').text() or i('.duration').text(),
})
return vlist
def getpq(self,path):
data=self.fetch(f'{self.host}{path}', headers=self.headers).text
return pq(self.cleanText(data))

View File

@ -207,7 +207,8 @@ class Spider(Spider):
return {'list':videos,'page':pg} return {'list':videos,'page':pg}
def playerContent(self, flag, id, vipFlags): def playerContent(self, flag, id, vipFlags):
return {'parse': 1, 'url': id, 'header': ''} video_url = f'https://jx.xmflv.com/?url={id}'
return {'parse': 1, 'url': video_url, 'header': ''}
def localProxy(self, param): def localProxy(self, param):
pass pass

View File

@ -261,8 +261,9 @@ class Spider(Spider):
def playerContent(self, flag, id, vipFlags): def playerContent(self, flag, id, vipFlags):
ids = id.split('@') ids = id.split('@')
url = f"{self.host}/x/cover/{ids[0]}/{ids[1]}.html" url = f"{self.host}/x/cover/{ids[0]}/{ids[1]}.html"
return {'parse': 1, 'url': url, 'header': ''} parse_url = f"https://jx.xmflv.com/?url={url}"
return {'parse': 1, 'url': parse_url, 'header': ''}
def localProxy(self, param): def localProxy(self, param):
pass pass

148
py/pyhitv.py Normal file
View File

@ -0,0 +1,148 @@
# coding=utf-8
# !/usr/bin/python
# 嗷呜
import sys
sys.path.append('..')
from base.spider import Spider
import requests
class Spider(Spider):
def init(self, extend=""):
pass
def getName(self):
return "hitv"
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def homeContent(self, filter):
result = {}
cateManual = {
# "直播": "live",
'排行榜': 'rank',
"电影": "1",
"剧集": "2",
"综艺": "3",
"动画": "4",
"短片": "5"
}
classes = []
for k in cateManual:
classes.append({
'type_name': k,
'type_id': cateManual[k]
})
result['class'] = classes
return result
host = "https://wys.upfuhn.com"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/80.0.3987.149 Safari/537.36"
}
def list(self, list):
videos = []
for it in list:
videos.append({
"vod_id": it['video_site_id'],
"vod_name": it['video_name'],
"vod_pic": it['video_horizontal_url'] or it['video_vertical_url'],
"vod_remarks": it['newest_series_num'],
"vod_year": it['years'],
})
return videos
def homeVideoContent(self):
url = f'{self.host}/v1/ys_video_sites/hot?t=1'
data = requests.get(url, headers=self.headers).json()
videos = self.list(data['data']['data'])
result = {'list': videos}
return result
def categoryContent(self, tid, pg, filter, extend):
path = f'/v1/ys_video_sites?t={tid}&s_t=0&a&y&o=0&ps=21&pn={pg}'
rank = False
if tid == 'rank':
if pg == 1:
path = f'/v1/ys_video_sites/ranking'
rank = True
else:
path = ''
# elif tid == 'live' and pg == 1:
# path = f'/v1/ys_live_tvs'
videos = []
result = {}
try:
data = requests.get(self.host + path, headers=self.headers).json()
if rank:
for video in data['data']:
videos.extend(data['data'][video])
else:
videos = data['data']['data']
result = {}
result['list'] = self.list(videos)
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
except:
result['list'] = []
return result
def detailContent(self, ids):
tid = ids[0]
url = f'{self.host}/v1/ys_video_series/by_vid/{tid}'
data = requests.get(url, headers=self.headers).json()
data1 = data['data']['ys_video_site']
urls = []
for it in data['data']['data']:
urls.append(it['series_num'] + '$' + it['video_url'])
vod = {
'vod_name': data1['video_name'],
'type_name': data1['tag'],
'vod_year': data1['years'],
'vod_area': data1['area'],
'vod_director': data1['main_actor'],
'vod_content': data1['video_desc'],
'vod_play_from': '嗷呜在线',
'vod_play_url': '#'.join(urls),
}
result = {
'list': [
vod
]
}
return result
def searchContent(self, key, quick, pg=1):
url = f'{self.host}/v1/ys_video_sites/search?s={key}&o=0&ps=200&pn={pg}'
data = requests.get(url, headers=self.headers).json()
videos = data['data']['video_sites']
if data['data']['first_video_series'] is not None:
videos = [data['data']['first_video_series']] + videos
result = {}
result['list'] = self.list(videos)
result['page'] = pg
return result
def playerContent(self, flag, id, vipFlags):
result = {
'url': id,
'parse': 0,
'header': self.headers
}
return result
def localProxy(self, param):
pass

94
py/pymp.py Normal file
View File

@ -0,0 +1,94 @@
# coding=utf-8
# !/usr/bin/python
import sys
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "mp"
def init(self, extend=""):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
host = 'https://g.c494.com'
header = {
'User-Agent': 'Dart/2.10 (dart:io)',
'platform_version': 'RP1A.200720.011',
'version': '2.2.3',
'copyright': 'xiaogui',
'platform': 'android',
'client_name': '576O5p+P5b2x6KeG',
}
def homeContent(self, filter):
data = self.fetch(f'{self.host}/api.php/app/nav?token=', headers=self.header).json()
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
"sort": "排序"}
filters = {}
classes = []
json_data = data["list"]
for item in json_data:
has_non_empty_field = False
jsontype_extend = item["type_extend"]
classes.append({"type_name": item["type_name"], "type_id": str(item["type_id"])})
for key in dy:
if key in jsontype_extend and jsontype_extend[key].strip() != "":
has_non_empty_field = True
break
if has_non_empty_field:
filters[str(item["type_id"])] = []
for dkey in jsontype_extend:
if dkey in dy and jsontype_extend[dkey].strip() != "":
values = jsontype_extend[dkey].split(",")
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
value.strip() != ""]
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
result = {}
result["class"] = classes
result["filters"] = filters
return result
def homeVideoContent(self):
rsp = self.fetch(f"{self.host}/api.php/app/index_video?token=", headers=self.header)
root = rsp.json()['list']
videos = [item for vodd in root for item in vodd['vlist']]
return {'list': videos}
def categoryContent(self, tid, pg, filter, extend):
parms = {"pg": pg, "tid": tid, "class": extend.get("class", ""), "area": extend.get("area", ""),
"lang": extend.get("lang", ""), "year": extend.get("year", ""), "token": ""}
data = self.fetch(f'{self.host}/api.php/app/video', params=parms, headers=self.header).json()
return data
def detailContent(self, ids):
parms = {"id": ids[0], "token": ""}
data = self.fetch(f'{self.host}/api.php/app/video_detail', params=parms, headers=self.header).json()
vod = data['data']
vod.pop('pause_advert_list', None)
vod.pop('init_advert_list', None)
vod.pop('vod_url_with_player', None)
return {"list": [vod]}
def searchContent(self, key, quick, pg='1'):
parms = {'pg': pg, 'text': key, 'token': ''}
data = self.fetch(f'{self.host}/api.php/app/search', params=parms, headers=self.header).json()
return data
def playerContent(self, flag, id, vipFlags):
return {"parse": 0, "url": id, "header": {'User-Agent': 'User-Agent: Lavf/58.12.100'}}
def localProxy(self, param):
pass

172
py/pyxpg.py Normal file
View File

@ -0,0 +1,172 @@
# coding=utf-8
# !/usr/bin/python
import sys
sys.path.append('')
from base.spider import Spider
from urllib.parse import quote
class Spider(Spider):
def getName(self):
return "xpg"
def init(self, extend=""):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def homeContent(self, filter):
data = self.fetch(
"{0}/api.php/v2.vod/androidtypes".format(self.host),
headers=self.header,
).json()
dy = {
"classes": "类型",
"areas": "地区",
"years": "年份",
"sortby": "排序",
}
filters = {}
classes = []
for item in data['data']:
has_non_empty_field = False
item['soryby'] = ['updatetime', 'hits', 'score']
demos = ['时间', '人气', '评分']
classes.append({"type_name": item["type_name"], "type_id": str(item["type_id"])})
for key in dy:
if key in item and len(item[key]) > 1:
has_non_empty_field = True
break
if has_non_empty_field:
filters[str(item["type_id"])] = []
for dkey in item:
if dkey in dy and len(item[dkey]) > 1:
values = item[dkey]
value_array = [
{"n": demos[idx] if dkey == "sortby" else value.strip(), "v": value.strip()}
for idx, value in enumerate(values)
if value.strip() != ""
]
filters[str(item["type_id"])].append(
{"key": dkey, "name": dy[dkey], "value": value_array}
)
result = {}
result["class"] = classes
result["filters"] = filters
return result
host = "http://item.xpgtv.com"
header = {
'User-Agent': 'okhttp/3.12.11',
'token': 'ElEDlwCVgXcFHFhddiq2JKteHofExRBUrfNlmHrWetU3VVkxnzJAodl52N9EUFS+Dig2A/fBa/V9RuoOZRBjYvI+GW8kx3+xMlRecaZuECdb/3AdGkYpkjW3wCnpMQxf8vVeCz5zQLDr8l8bUChJiLLJLGsI+yiNskiJTZz9HiGBZhZuWh1mV1QgYah5CLTbSz8=',
'token2': 'a0kEsBKRgTkBZ29NZ3WcNKN/C4T00RN/hNkmmGa5JMBeEENnqydLoetm/t8=',
'user_id': 'XPGBOX',
'version': 'XPGBOX com.phoenix.tv1.5.3',
'timestamp': '1732286435',
'hash': 'd9ab',
}
def homeVideoContent(self):
rsp = self.fetch("{0}/api.php/v2.main/androidhome".format(self.host), headers=self.header)
root = rsp.json()['data']['list']
videos = []
for vodd in root:
for vod in vodd['list']:
videos.append({
"vod_id": vod['id'],
"vod_name": vod['name'],
"vod_pic": vod['pic'],
"vod_remarks": vod['score']
})
result = {
'list': videos
}
return result
def categoryContent(self, tid, pg, filter, extend):
parms = []
parms.append(f"page={pg}")
parms.append(f"type={tid}")
if extend.get('areas'):
parms.append(f"area={quote(extend['areaes'])}")
if extend.get('years'):
parms.append(f"year={quote(extend['yeares'])}")
if extend.get('sortby'):
parms.append(f"sortby={extend['sortby']}")
if extend.get('classes'):
parms.append(f"class={quote(extend['classes'])}")
parms = "&".join(parms)
result = {}
url = '{0}/api.php/v2.vod/androidfilter10086?{1}'.format(self.host, parms)
rsp = self.fetch(url, headers=self.header)
root = rsp.json()['data']
videos = []
for vod in root:
videos.append({
"vod_id": vod['id'],
"vod_name": vod['name'],
"vod_pic": vod['pic'],
"vod_remarks": vod['score']
})
result['list'] = videos
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
id = ids[0]
url = '{0}/api.php/v3.vod/androiddetail2?vod_id={1}'.format(self.host, id)
rsp = self.fetch(url, headers=self.header)
root = rsp.json()['data']
node = root['urls']
d = [it['key'] + "$" + f"http://c.xpgtv.net/m3u8/{it['url']}.m3u8" for it in node]
vod = {
"vod_name": root['name'],
'vod_play_from': '小苹果',
'vod_play_url': '#'.join(d),
}
print(vod)
result = {
'list': [
vod
]
}
return result
def searchContent(self, key, quick, pg='1'):
url = '{0}/api.php/v2.vod/androidsearch10086?page={1}&wd={2}'.format(self.host, pg, key)
rsp = self.fetch(url, headers=self.header)
root = rsp.json()['data']
videos = []
for vod in root:
videos.append({
"vod_id": vod['id'],
"vod_name": vod['name'],
"vod_pic": vod['pic'],
"vod_remarks": vod['score']
})
result = {
'list': videos
}
return result
def playerContent(self, flag, id, vipFlags):
result = {}
result["parse"] = 0
result["url"] = id
result["header"] = self.header
return result
def localProxy(self, param):
pass

195
py/py光速.py Normal file
View File

@ -0,0 +1,195 @@
# coding=utf-8
# !/usr/bin/python
# by嗷呜
import re
import sys
from urllib.parse import quote
from Crypto.Hash import MD5
sys.path.append("..")
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from base64 import b64encode, b64decode
import json
import time
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "光速"
def init(self, extend=""):
self.host = self.gethost()
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
def homeContent(self, filter):
data = self.getdata("/api.php/getappapi.index/initV119")
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
"sort": "排序", }
filters = {}
classes = []
json_data = data["type_list"]
homedata = data["banner_list"]
for item in json_data:
if item["type_name"] == "全部":
continue
has_non_empty_field = False
jsontype_extend = json.loads(item["type_extend"])
homedata.extend(item["recommend_list"])
jsontype_extend["sort"] = "最新,最热,最赞"
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
for key in dy:
if key in jsontype_extend and jsontype_extend[key].strip() != "":
has_non_empty_field = True
break
if has_non_empty_field:
filters[str(item["type_id"])] = []
for dkey in jsontype_extend:
if dkey in dy and jsontype_extend[dkey].strip() != "":
values = jsontype_extend[dkey].split(",")
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
value.strip() != ""]
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
result = {}
result["class"] = classes
result["filters"] = filters
result["list"] = homedata
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
"class": extend.get('class', '全部')}
result = {}
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
result["list"] = data["recommend_list"]
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def detailContent(self, ids):
body = f"vod_id={ids[0]}"
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
vod = data["vod"]
play = []
names = []
for itt in data["vod_play_list"]:
a = []
names.append(itt["player_info"]["show"])
parse = itt["player_info"]["parse"]
ua = ''
if itt["player_info"].get("user_agent", ''):
ua = b64encode(itt["player_info"]["user_agent"].encode('utf-8')).decode('utf-8')
for it in itt["urls"]:
url = it["url"]
if not re.search(r'\.m3u8|\.mp4', url):
url = parse + '@@' + url
url = b64encode(url.encode('utf-8')).decode('utf-8')
a.append(f"{it['name']}${url}|||{ua}|||{it['token']}")
play.append("#".join(a))
vod["vod_play_from"] = "$$$".join(names)
vod["vod_play_url"] = "$$$".join(play)
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg="1"):
body = f"keywords={key}&type_id=0&page={pg}"
data = self.getdata("/api.php/getappapi.index/searchList", body)
result = {"list": data["search_list"], "page": pg}
return result
phend = {
'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 11; M2012K10C Build/RP1A.200720.011)'}
def playerContent(self, flag, id, vipFlags):
ids = id.split("|||")
if ids[1]: self.phend['User-Agent'] = b64decode(ids[1]).decode('utf-8')
url = b64decode(ids[0]).decode('utf-8')
if not re.search(r'\.m3u8|\.mp4', url):
a = url.split("@@")
body = f"parse_api={a[0]}&url={quote(self.aes('encrypt', a[1]))}&token={ids[-1]}"
jd = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
url = json.loads(jd)['url']
# if '.mp4' not in url:
# l=self.fetch(url, headers=self.phend,allow_redirects=False)
# if l.status_code == 200 and l.headers.get('Location',''):
# url=l.headers['Location']
if '.jpg' in url or '.png' in url or '.jpeg' in url:
url = self.getProxyUrl() + "&url=" + b64encode(url.encode('utf-8')).decode('utf-8') + "&type=m3u8"
result = {}
result["parse"] = 0
result["url"] = url
result["header"] = self.phend
return result
def localProxy(self, param):
url = b64decode(param["url"]).decode('utf-8')
durl = url[:url.rfind('/')]
data = self.fetch(url, headers=self.phend).content.decode("utf-8")
inde = None
pd = True
lines = data.strip().split('\n')
for index, string in enumerate(lines):
# if '#EXT-X-DISCONTINUITY' in string and pd:
# pd = False
# inde = index
if '#EXT' not in string and 'http' not in string:
lines[index] = durl + ('' if string.startswith('/') else '/') + string
if inde:
del lines[inde:inde + 4]
data = '\n'.join(lines)
return [200, "application/vnd.apple.mpegur", data]
def gethost(self):
host = self.fetch('https://jingyu-1312635929.cos.ap-nanjing.myqcloud.com/1.json').text.strip()
return host
def aes(self, operation, text):
key = "4d83b87c4c5ea111".encode("utf-8")
iv = key
if operation == "encrypt":
cipher = AES.new(key, AES.MODE_CBC, iv)
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
ct = b64encode(ct_bytes).decode("utf-8")
return ct
elif operation == "decrypt":
cipher = AES.new(key, AES.MODE_CBC, iv)
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
return pt.decode("utf-8")
def header(self):
t = str(int(time.time()))
md5_hash = MD5.new()
md5_hash.update(t.encode('utf-8'))
signature_md5 = md5_hash.hexdigest()
header = {"User-Agent": "okhttp/3.14.9", "app-version-code": "300", "app-ui-mode": "light",
"app-user-device-id": signature_md5, "app-api-verify-time": t,
"app-api-verify-sign": self.aes("encrypt", t), "Content-Type": "application/x-www-form-urlencoded"}
return header
def getdata(self, path, data=None):
# data = self.post(self.host + path, headers=self.header(), data=data).text
data = self.post(self.host + path, headers=self.header(), data=data, verify=False).json()["data"]
data1 = self.aes("decrypt", data)
return json.loads(data1)

265
py/xhm.py Normal file
View File

@ -0,0 +1,265 @@
# coding=utf-8
# !/usr/bin/python
# by嗷呜
import json
import sys
from base64 import b64decode, b64encode
from pyquery import PyQuery as pq
from requests import Session
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.host = self.gethost()
self.headers['referer'] = f'{self.host}/'
self.session = Session()
self.session.headers.update(self.headers)
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'sec-ch-ua': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-full-version': '"133.0.6943.98"',
'sec-ch-ua-arch': '"x86"',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua-platform-version': '"19.0.0"',
'sec-ch-ua-model': '""',
'sec-ch-ua-full-version-list': '"Not(A:Brand";v="99.0.0.0", "Google Chrome";v="133.0.6943.98", "Chromium";v="133.0.6943.98"',
'dnt': '1',
'upgrade-insecure-requests': '1',
'sec-fetch-site': 'none',
'sec-fetch-mode': 'navigate',
'sec-fetch-user': '?1',
'sec-fetch-dest': 'document',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'priority': 'u=0, i'
}
def homeContent(self, filter):
result = {}
cateManual = {
"4K": "/4k",
"国产": "two_click_/categories/chinese",
"最新": "/newest",
"最佳": "/best",
"频道": "/channels",
"类别": "/categories",
"明星": "/pornstars"
}
classes = []
filters = {}
for k in cateManual:
classes.append({
'type_name': k,
'type_id': cateManual[k]
})
if k !='4K':filters[cateManual[k]]=[{'key':'type','name':'类型','value':[{'n':'4K','v':'/4k'}]}]
result['class'] = classes
result['filters'] = filters
return result
def homeVideoContent(self):
data = self.getpq()
return {'list': self.getlist(data(".thumb-list--sidebar .thumb-list__item"))}
def categoryContent(self, tid, pg, filter, extend):
vdata = []
result = {}
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
if tid in ['/4k', '/newest', '/best'] or 'two_click_' in tid:
if 'two_click_' in tid: tid = tid.split('click_')[-1]
data = self.getpq(f'{tid}{extend.get("type","")}/{pg}')
vdata = self.getlist(data(".thumb-list--sidebar .thumb-list__item"))
elif tid == '/channels':
data = self.getpq(f'{tid}/{pg}')
jsdata = self.getjsdata(data)
for i in jsdata['channels']:
vdata.append({
'vod_id': f"two_click_" + i.get('channelURL'),
'vod_name': i.get('channelName'),
'vod_pic': i.get('siteLogoURL'),
'vod_year': f'videos:{i.get("videoCount")}',
'vod_tag': 'folder',
'vod_remarks': f'subscribers:{i["subscriptionModel"].get("subscribers")}',
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif tid == '/categories':
result['pagecount'] = pg
data = self.getpq(tid)
self.cdata = self.getjsdata(data)
for i in self.cdata['layoutPage']['store']['popular']['assignable']:
vdata.append({
'vod_id': "one_click_" + i.get('id'),
'vod_name': i.get('name'),
'vod_pic': '',
'vod_tag': 'folder',
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif tid == '/pornstars':
data = self.getpq(f'{tid}/{pg}')
pdata = self.getjsdata(data)
for i in pdata['pagesPornstarsComponent']['pornstarListProps']['pornstars']:
vdata.append({
'vod_id': f"two_click_" + i.get('pageURL'),
'vod_name': i.get('name'),
'vod_pic': i.get('imageThumbUrl'),
'vod_remarks': i.get('translatedCountryName'),
'vod_tag': 'folder',
'style': {'ratio': 1.33, 'type': 'rect'}
})
elif 'one_click' in tid:
result['pagecount'] = pg
tid = tid.split('click_')[-1]
for i in self.cdata['layoutPage']['store']['popular']['assignable']:
if i.get('id') == tid:
for j in i['items']:
vdata.append({
'vod_id': f"two_click_" + j.get('url'),
'vod_name': j.get('name'),
'vod_pic': j.get('thumb'),
'vod_tag': 'folder',
'style': {'ratio': 1.33, 'type': 'rect'}
})
result['list'] = vdata
return result
def detailContent(self, ids):
data = self.getpq(ids[0])
djs = self.getjsdata(data)
vn = data('meta[property="og:title"]').attr('content')
dtext = data('#video-tags-list-container')
href = dtext('a').attr('href')
title = dtext('span[class*="body-bold-"]').eq(0).text()
pdtitle = ''
if href:
pdtitle = '[a=cr:' + json.dumps({'id': 'two_click_' + href, 'name': title}) + '/]' + title + '[/a]'
vod = {
'vod_name': vn,
'vod_director': pdtitle,
'vod_remarks': data('.rb-new__info').text(),
'vod_play_from': 'Xhamster',
'vod_play_url': ''
}
try:
plist = []
d = djs['xplayerSettings']['sources']
f = d.get('standard')
ah=[]
if d.get('hls'):
for format_type, info in d['hls'].items():
if url := info.get('url'):
encoded = self.e64(f'{0}@@@@{url}')
plist.append(f"{format_type}${encoded}")
if f:
for key, value in f.items():
if isinstance(value, list):
ah.extend(value)
if len(ah):
for info in ah:
id = self.e64(f'{0}@@@@{info.get("url") or info.get("fallback")}')
plist.append(f"{info.get('label') or info.get('quality')}${id}")
except Exception as e:
plist = [f"{vn}${self.e64(f'{1}@@@@{ids[0]}')}"]
print(f"获取视频信息失败: {str(e)}")
vod['vod_play_url'] = '#'.join(plist)
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
data = self.getpq(f'/search/{key}?page={pg}')
return {'list': self.getlist(data(".thumb-list--sidebar .thumb-list__item")), 'page': pg}
def playerContent(self, flag, id, vipFlags):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.5410.0 Safari/537.36',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'dnt': '1',
'sec-ch-ua-mobile': '?0',
'origin': self.host,
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'referer': f'{self.host}/',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'priority': 'u=1, i',
}
ids = self.d64(id).split('@@@@')
return {'parse': int(ids[0]), 'url': ids[1], 'header': headers}
def localProxy(self, param):
pass
def gethost(self):
try:
response = self.fetch('http://127.0.0.1:10079/p/0/127.0.0.1:10172/https://xhamster.com', headers=self.headers, allow_redirects=False)
return response.headers['Location']
except Exception as e:
print(f"获取主页失败: {str(e)}")
return "http://127.0.0.1:10079/p/0/127.0.0.1:10172/https://zn.xhamster.com"
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def getlist(self, data):
vlist = []
for i in data.items():
vlist.append({
'vod_id': i('.role-pop').attr('href'),
'vod_name': i('.video-thumb-info a').text(),
'vod_pic': i('.role-pop img').attr('src'),
'vod_year': i('.video-thumb-info .video-thumb-views').text().split(' ')[0],
'vod_remarks': i('.role-pop div[data-role="video-duration"]').text(),
'style': {'ratio': 1.33, 'type': 'rect'}
})
return vlist
def getpq(self, path=''):
h = '' if path.startswith('http') else self.host
response = self.session.get(f'{h}{path}').text
try:
return pq(response)
except Exception as e:
print(f"{str(e)}")
return pq(response.encode('utf-8'))
def getjsdata(self, data):
vhtml = data("script[id='initials-script']").text()
jst = json.loads(vhtml.split('initials=')[-1][:-1])
return jst

176
py/小红书.py Normal file
View File

@ -0,0 +1,176 @@
# coding=utf-8
# !/usr/bin/python
# by嗷呜
import json
import random
import sys
import time
from base64 import b64decode
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Util.Padding import unpad
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "小红书"
def init(self, extend=""):
self.did = self.random_str(32)
self.token,self.phost = self.gettoken()
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def random_str(self,length=16):
hex_chars = '0123456789abcdef'
return ''.join(random.choice(hex_chars) for _ in range(length))
def md5(self, text: str) -> str:
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()
def homeContent(self, filter):
data = self.fetch(f'{self.host}/api/video/queryClassifyList?mark=4', headers=self.headers()).json()['encData']
data1 = self.aes(data)
result = {}
classes = []
for k in data1['data']:
classes.append({'type_name': k['classifyTitle'], 'type_id': k['classifyId']})
result['class'] = classes
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
path=f'/api/short/video/getShortVideos?classifyId={tid}&videoMark=4&page={pg}&pageSize=20'
result = {}
videos = []
data=self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
vdata=self.aes(data)
for k in vdata['data']:
videos.append({"vod_id": k['videoId'], 'vod_name': k.get('title'), 'vod_pic': self.getProxyUrl() + '&url=' + k['coverImg'],
'vod_remarks': self.dtim(k.get('playTime'))})
result["list"] = videos
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def detailContent(self, ids):
path = f'/api/video/getVideoById?videoId={ids[0]}'
data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
v = self.aes(data)
d=f'{v["title"]}$auth_key={v["authKey"]}&path={v["videoUrl"]}'
vod = {'vod_name': v["title"], 'type_name': ''.join(v.get('tagTitles',[])),'vod_play_from': v.get('nickName') or "小红书官方", 'vod_play_url': d}
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg='1'):
pass
def playerContent(self, flag, id, vipFlags):
h=self.headers()
h['Authorization'] = h.pop('aut')
del h['deviceid']
result = {"parse": 0, "url": f"{self.host}/api/m3u8/decode/authPath?{id}", "header": h}
return result
def localProxy(self, param):
return self.action(param)
def aes(self, word):
key = b64decode("SmhiR2NpT2lKSVV6STFOaQ==")
iv = key
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(b64decode(word)), AES.block_size)
return json.loads(decrypted.decode('utf-8'))
def dtim(self, seconds):
try:
seconds = int(seconds)
hours = seconds // 3600
remaining_seconds = seconds % 3600
minutes = remaining_seconds // 60
remaining_seconds = remaining_seconds % 60
formatted_minutes = str(minutes).zfill(2)
formatted_seconds = str(remaining_seconds).zfill(2)
if hours > 0:
formatted_hours = str(hours).zfill(2)
return f"{formatted_hours}:{formatted_minutes}:{formatted_seconds}"
else:
return f"{formatted_minutes}:{formatted_seconds}"
except:
return ''
def getsign(self):
t=str(int(time.time() * 1000))
return self.md5(t[3:8])
def gettoken(self):
url = f'{self.host}/api/user/traveler'
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6',
'deviceid': self.did, 't': str(int(time.time() * 1000)), 's': self.getsign(), }
data = {'deviceId': self.did, 'tt': 'U', 'code': '', 'chCode': 'dafe13'}
data1 = self.post(url, json=data, headers=headers).json()
data2 = data1['data']
return data2['token'], data2['imgDomain']
host = 'https://jhfkdnov21vfd.fhoumpjjih.work'
def headers(self):
henda = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/xhs/ver=1.2.6',
'deviceid': self.did, 't': str(int(time.time() * 1000)), 's': self.getsign(), 'aut': self.token}
return henda
def action(self, param):
headers = {
'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 11; M2012K10C Build/RP1A.200720.011)'}
data = self.fetch(f'{self.phost}{param["url"]}', headers=headers)
type=data.headers.get('Content-Type').split(';')[0]
base64_data = self.img(data.content, 100, '2020-zq3-888')
return [200, type, base64_data]
def img(self, data: bytes, length: int, key: str):
GIF = b'\x47\x49\x46'
JPG = b'\xFF\xD8\xFF'
PNG = b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'
def is_dont_need_decode_for_gif(data):
return len(data) > 2 and data[:3] == GIF
def is_dont_need_decode_for_jpg(data):
return len(data) > 7 and data[:3] == JPG
def is_dont_need_decode_for_png(data):
return len(data) > 7 and data[1:8] == PNG[1:8]
if is_dont_need_decode_for_png(data):
return data
elif is_dont_need_decode_for_gif(data):
return data
elif is_dont_need_decode_for_jpg(data):
return data
else:
key_bytes = key.encode('utf-8')
result = bytearray(data)
for i in range(length):
result[i] ^= key_bytes[i % len(key_bytes)]
return bytes(result)

217
py/推特.py Normal file
View File

@ -0,0 +1,217 @@
# coding=utf-8
# !/usr/bin/python
# by嗷呜
import json
import sys
import time
from base64 import b64decode
from urllib.parse import quote
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Util.Padding import unpad
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "tuit"
def init(self, extend=""):
self.did = MD5.new((self.t).encode()).hexdigest()
self.token = self.gettoken()
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
def aes(self, word):
key = b64decode("SmhiR2NpT2lKSVV6STFOaQ==")
iv = key
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(b64decode(word)), AES.block_size)
return json.loads(decrypted.decode('utf-8'))
def dtim(self, seconds):
seconds = int(seconds)
hours = seconds // 3600
remaining_seconds = seconds % 3600
minutes = remaining_seconds // 60
remaining_seconds = remaining_seconds % 60
formatted_minutes = str(minutes).zfill(2)
formatted_seconds = str(remaining_seconds).zfill(2)
if hours > 0:
formatted_hours = str(hours).zfill(2)
return f"{formatted_hours}:{formatted_minutes}:{formatted_seconds}"
else:
return f"{formatted_minutes}:{formatted_seconds}"
def gettoken(self):
url = 'https://d1frehx187fm2c.cloudfront.net/api/user/traveler'
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/twitter/ver=1.3.4',
'deviceid': self.did, 't': self.t, 's': self.sign, }
data = {'deviceId': self.did, 'tt': 'U', 'code': '', 'chCode': ''}
data1 = self.post(url, json=data, headers=headers).json()
token = data1['data']['token']
return token
t = str(int(time.time() * 1000))
sign = MD5.new((t[3:8]).encode()).hexdigest()
host = 'https://api.wcyfhknomg.work'
def headers(self):
henda = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/twitter/ver=1.3.4',
'deviceid': self.did, 't': self.t, 's': self.sign, 'aut': self.token}
return henda
def homeContent(self, filter):
data = self.fetch(f'{self.host}/api/video/classifyList', headers=self.headers()).json()['encData']
data1 = self.aes(data)
result = {'filters': {"1": [{"key": "fl", "name": "分类",
"value": [{"n": "最近更新", "v": "1"}, {"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}], "2": [{"key": "fl", "name": "分类",
"value": [
{"n": "最近更新", "v": "1"},
{"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}],
"3": [{"key": "fl", "name": "分类",
"value": [{"n": "最近更新", "v": "1"}, {"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}], "4": [{"key": "fl", "name": "分类",
"value": [
{"n": "最近更新", "v": "1"},
{"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}],
"5": [{"key": "fl", "name": "分类",
"value": [{"n": "最近更新", "v": "1"}, {"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}], "6": [{"key": "fl", "name": "分类",
"value": [
{"n": "最近更新", "v": "1"},
{"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}],
"7": [{"key": "fl", "name": "分类",
"value": [{"n": "最近更新", "v": "1"}, {"n": "最多播放", "v": "2"},
{"n": "好评榜", "v": "3"}]}], "jx": [{"key": "type", "name": "精选",
"value": [{"n": "日榜", "v": "1"},
{"n": "周榜", "v": "2"},
{"n": "月榜", "v": "3"},
{"n": "总榜",
"v": "4"}]}]}}
classes = [{'type_name': "精选", 'type_id': "jx"}]
for k in data1['data']:
classes.append({'type_name': k['classifyTitle'], 'type_id': k['classifyId']})
result['class'] = classes
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
path = f'/api/video/queryVideoByClassifyId?pageSize=20&page={pg}&classifyId={tid}&sortType={extend.get("fl", "1")}'
if 'click' in tid:
path = f'/api/video/queryPersonVideoByType?pageSize=20&page={pg}&userId={tid.replace("click", "")}'
if tid == 'jx':
path = f'/api/video/getRankVideos?pageSize=20&page={pg}&type={extend.get("type", "1")}'
data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
data1 = self.aes(data)['data']
result = {}
videos = []
for k in data1:
img = 'https://dg2ordyr4k5v3.cloudfront.net/' + k.get('coverImg')[0]
id = f'{k.get("videoId")}?{k.get("userId")}?{k.get("nickName")}'
if 'click' in tid:
id = id + 'click'
videos.append({"vod_id": id, 'vod_name': k.get('title'), 'vod_pic': self.getProxyUrl() + '&url=' + img,
'vod_remarks': self.dtim(k.get('playTime')),'style': {"type": "rect", "ratio": 1.33}})
result["list"] = videos
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def detailContent(self, ids):
vid = ids[0].replace('click', '').split('?')
path = f'/api/video/can/watch?videoId={vid[0]}'
data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
data1 = self.aes(data)['playPath']
clj = '[a=cr:' + json.dumps({'id': vid[1] + 'click', 'name': vid[2]}) + '/]' + vid[2] + '[/a]'
if 'click' in ids[0]:
clj = vid[2]
vod = {'vod_director': clj, 'vod_play_from': "推特", 'vod_play_url': vid[2] + "$" + data1}
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg='1'):
path = f'/api/search/keyWord?pageSize=20&page={pg}&searchWord={quote(key)}&searchType=1'
data = self.fetch(f'{self.host}{path}', headers=self.headers()).json()['encData']
data1 = self.aes(data)['videoList']
result = {}
videos = []
for k in data1:
img = 'https://dg2ordyr4k5v3.cloudfront.net/' + k.get('coverImg')[0]
id = f'{k.get("videoId")}?{k.get("userId")}?{k.get("nickName")}'
videos.append({"vod_id": id, 'vod_name': k.get('title'), 'vod_pic': self.getProxyUrl() + '&url=' + img,
'vod_remarks': self.dtim(k.get('playTime')), 'style': {"type": "rect", "ratio": 1.33}})
result["list"] = videos
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def playerContent(self, flag, id, vipFlags):
result = {"parse": 0, "url": id, "header": {'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/twitter/ver=1.3.4'}}
return result
def localProxy(self, param):
return self.imgs(param)
def imgs(self, param):
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 11; M2012K10C Build/RP1A.200720.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/87.0.4280.141 Mobile Safari/537.36;SuiRui/twitter/ver=1.3.4'}
url = param['url']
type = url.split('.')[-1].split('_')[0]
data = self.fetch(url,headers=headers).content
bdata = self.img(data, 100, '2020-zq3-888')
return [200, f'image/{type}', bdata]
def img(self, data: bytes, length: int, key: str):
GIF = b'\x47\x49\x46'
JPG = b'\xFF\xD8\xFF'
PNG = b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'
def is_dont_need_decode_for_gif(data):
return len(data) > 2 and data[:3] == GIF
def is_dont_need_decode_for_jpg(data):
return len(data) > 7 and data[:3] == JPG
def is_dont_need_decode_for_png(data):
return len(data) > 7 and data[1:8] == PNG[1:8]
if is_dont_need_decode_for_png(data):
return data
elif is_dont_need_decode_for_gif(data):
return data
elif is_dont_need_decode_for_jpg(data):
return data
else:
key_bytes = key.encode('utf-8')
result = bytearray(data)
for i in range(length):
result[i] ^= key_bytes[i % len(key_bytes)]
return bytes(result)