Update app.py
This commit is contained in:
parent
8fc6058ca8
commit
efdc5e168a
54
app.py
54
app.py
@ -2,6 +2,7 @@ import re
|
|||||||
import requests
|
import requests
|
||||||
import os
|
import os
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
import time
|
||||||
|
|
||||||
# ================== 配置区域 ==================
|
# ================== 配置区域 ==================
|
||||||
# 需要删除的分组 (这些分组下的频道会被过滤)
|
# 需要删除的分组 (这些分组下的频道会被过滤)
|
||||||
@ -25,6 +26,9 @@ M3U_SOURCES = [
|
|||||||
{"name": "MyIPTV", "url": "https://git.gra.phite.ro/alantang/auto-iptv/raw/branch/main/live_ipv6.m3u", "ua": "okhttp/4.12.0"}
|
{"name": "MyIPTV", "url": "https://git.gra.phite.ro/alantang/auto-iptv/raw/branch/main/live_ipv6.m3u", "ua": "okhttp/4.12.0"}
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# 最大响应时间(秒),超过此时间的频道将被视为异常
|
||||||
|
MAX_RESPONSE_TIME = 5
|
||||||
|
|
||||||
# ================== 核心功能 ==================
|
# ================== 核心功能 ==================
|
||||||
def robust_download(url, ua, max_retries=3):
|
def robust_download(url, ua, max_retries=3):
|
||||||
headers = {'User-Agent': ua}
|
headers = {'User-Agent': ua}
|
||||||
@ -35,12 +39,10 @@ def robust_download(url, ua, max_retries=3):
|
|||||||
response.encoding = response.apparent_encoding
|
response.encoding = response.apparent_encoding
|
||||||
return response.text
|
return response.text
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if attempt == max_retries - 1: raise
|
if attempt == max_retries - 1:
|
||||||
print(f"正在重试 {url} (第 {attempt+1} 次)")
|
raise
|
||||||
|
print(f"正在重试 {url} (第 {attempt + 1} 次)")
|
||||||
|
|
||||||
def process_channel(line):
|
|
||||||
if any(f'group-title="{g}"' in line for g in DELETE_GROUPS):
|
|
||||||
return None
|
|
||||||
|
|
||||||
def process_channel(line):
|
def process_channel(line):
|
||||||
"""频道信息处理流水线"""
|
"""频道信息处理流水线"""
|
||||||
@ -71,6 +73,7 @@ def process_channel(line):
|
|||||||
|
|
||||||
return line
|
return line
|
||||||
|
|
||||||
|
|
||||||
def parse_m3u(content):
|
def parse_m3u(content):
|
||||||
channels = []
|
channels = []
|
||||||
current_channel = {}
|
current_channel = {}
|
||||||
@ -84,6 +87,30 @@ def parse_m3u(content):
|
|||||||
current_channel = {}
|
current_channel = {}
|
||||||
return channels
|
return channels
|
||||||
|
|
||||||
|
|
||||||
|
def measure_response_time(url, ua):
|
||||||
|
headers = {'User-Agent': ua}
|
||||||
|
try:
|
||||||
|
start_time = time.time()
|
||||||
|
response = requests.head(url, headers=headers, timeout=MAX_RESPONSE_TIME)
|
||||||
|
response.raise_for_status()
|
||||||
|
return time.time() - start_time
|
||||||
|
except Exception:
|
||||||
|
return float('inf')
|
||||||
|
|
||||||
|
|
||||||
|
def sort_channels_by_response_time(channels):
|
||||||
|
sorted_channels = []
|
||||||
|
for channel in channels:
|
||||||
|
url = channel["url"]
|
||||||
|
ua = M3U_SOURCES[0]["ua"] # 假设所有源使用相同的UA
|
||||||
|
response_time = measure_response_time(url, ua)
|
||||||
|
if response_time < MAX_RESPONSE_TIME:
|
||||||
|
sorted_channels.append((response_time, channel))
|
||||||
|
sorted_channels.sort(key=lambda x: x[0])
|
||||||
|
return [channel for _, channel in sorted_channels]
|
||||||
|
|
||||||
|
|
||||||
def generate_m3u_output(channels):
|
def generate_m3u_output(channels):
|
||||||
"""生成M3U格式内容"""
|
"""生成M3U格式内容"""
|
||||||
group_dict = defaultdict(list)
|
group_dict = defaultdict(list)
|
||||||
@ -101,11 +128,13 @@ def generate_m3u_output(channels):
|
|||||||
|
|
||||||
output = ["#EXTM3U"]
|
output = ["#EXTM3U"]
|
||||||
for group, items in ordered_groups:
|
for group, items in ordered_groups:
|
||||||
for item in items:
|
sorted_items = sort_channels_by_response_time(items)
|
||||||
|
for item in sorted_items:
|
||||||
output.append(item["meta"])
|
output.append(item["meta"])
|
||||||
output.append(item["url"])
|
output.append(item["url"])
|
||||||
return "\n".join(output)
|
return "\n".join(output)
|
||||||
|
|
||||||
|
|
||||||
def generate_txt_output(channels):
|
def generate_txt_output(channels):
|
||||||
"""生成TXT格式内容(分组名称,频道名称,URL)"""
|
"""生成TXT格式内容(分组名称,频道名称,URL)"""
|
||||||
txt_lines = []
|
txt_lines = []
|
||||||
@ -126,19 +155,25 @@ def generate_txt_output(channels):
|
|||||||
|
|
||||||
return "\n".join(txt_lines)
|
return "\n".join(txt_lines)
|
||||||
|
|
||||||
|
|
||||||
def save_file(content, filename):
|
def save_file(content, filename):
|
||||||
"""通用文件保存函数"""
|
"""通用文件保存函数"""
|
||||||
|
# 创建 live 文件夹,如果不存在
|
||||||
|
if not os.path.exists('live'):
|
||||||
|
os.makedirs('live')
|
||||||
try:
|
try:
|
||||||
with open(filename, "w", encoding="utf-8") as f:
|
file_path = os.path.join('live', filename)
|
||||||
|
with open(file_path, "w", encoding="utf-8") as f:
|
||||||
f.write(content)
|
f.write(content)
|
||||||
print(f"✅ 成功生成 {filename} 文件")
|
print(f"✅ 成功生成 {file_path} 文件")
|
||||||
return True
|
return True
|
||||||
except PermissionError:
|
except PermissionError:
|
||||||
print(f"❌ 无写入权限: {filename}")
|
print(f"❌ 无写入权限: {file_path}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"❌ 保存文件失败: {str(e)}")
|
print(f"❌ 保存文件失败: {str(e)}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
all_channels = []
|
all_channels = []
|
||||||
|
|
||||||
@ -169,5 +204,6 @@ def main():
|
|||||||
|
|
||||||
print(f"\n处理完成!有效频道总数: {len(all_channels)}")
|
print(f"\n处理完成!有效频道总数: {len(all_channels)}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
Loading…
Reference in New Issue
Block a user