Update app.py

This commit is contained in:
alantang 2025-03-25 14:13:44 +08:00 committed by GitHub
parent 72c6ea2331
commit 51baffde3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

98
app.py
View File

@ -6,25 +6,12 @@ from collections import defaultdict
# ================== 配置区域 ==================
# 需要删除的分组 (这些分组下的频道会被过滤)
DELETE_GROUPS = ["4K频道", "8K频道"]
# 需要转换为地方频道的省份
PROVINCE_GROUPS = ["北京", "安徽", "甘肃", "广东", "贵州", "海南", "河北", "河南", "黑龙江", "湖北", "湖南",
"吉林", "江苏", "江西", "辽宁", "青海", "山东", "上海", "四川", "云南", "浙江", "重庆", "香港"]
# 分组名称替换规则
GROUP_REPLACEMENTS = {
"央视": "央视频道",
"卫视": "卫视频道",
"其他": "其他频道"
}
# 需要删除的频道名称冗余字符
GROUP_REPLACEMENTS = {"央视": "央视频道", "卫视": "卫视频道", "其他": "其他频道"}
DELETE_CHARS = ["iHOT-", "NewTV-", "SiTV-", "-HEVC", "-50-FPS", "-高码", "-4K", "-BPTV", "咪咕视频_8M1080_"]
# 最终分组排序规则
GROUP_ORDER = ["收藏频道", "央视频道", "卫视频道", "其他频道", "地方频道"]
# 数据源配置 (包含重试机制)
M3U_SOURCES = [
{"name": "aktv", "url": "https://gh.tryxd.cn/https://raw.githubusercontent.com/alantang1977/JunTV/main/output/result.m3u", "ua": "okhttp/4.12.0"},
{"name": "自用收藏", "url": "http://aktv.space/live.m3u", "ua": "okhttp/4.12.0"},
@ -39,19 +26,21 @@ M3U_SOURCES = [
# ================== 核心功能 ==================
def robust_download(url, ua, max_retries=3):
"""带重试机制的下载函数"""
headers = {'User-Agent': ua}
for attempt in range(max_retries):
try:
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
response.encoding = response.apparent_encoding # 自动检测编码
response.encoding = response.apparent_encoding
return response.text
except Exception as e:
if attempt == max_retries - 1:
raise
if attempt == max_retries - 1: raise
print(f"正在重试 {url} (第 {attempt+1} 次)")
def process_channel(line):
if any(f'group-title="{g}"' in line for g in DELETE_GROUPS):
return None
def process_channel(line):
"""频道信息处理流水线"""
# 过滤不需要的分组
@ -82,10 +71,8 @@ def process_channel(line):
return line
def parse_m3u(content):
"""解析M3U内容并结构化存储"""
channels = []
current_channel = {}
for line in content.splitlines():
line = line.strip()
if line.startswith("#EXTINF"):
@ -96,26 +83,21 @@ def parse_m3u(content):
current_channel = {}
return channels
def generate_output(channels):
"""生成排序后的最终内容"""
# 按分组归类
def generate_m3u_output(channels):
"""生成M3U格式内容"""
group_dict = defaultdict(list)
for channel in channels:
if match := re.search(r'group-title="([^"]+)"', channel["meta"]):
group = match.group(1)
group_dict[group].append(channel)
# 按自定义顺序排序
ordered_groups = []
for group in GROUP_ORDER:
if group in group_dict:
ordered_groups.append((group, group_dict.pop(group)))
# 添加剩余分组并按字母排序
for group in sorted(group_dict.keys()):
ordered_groups.append((group, group_dict[group]))
# 生成最终文本
output = ["#EXTM3U"]
for group, items in ordered_groups:
for item in items:
@ -123,8 +105,40 @@ def generate_output(channels):
output.append(item["url"])
return "\n".join(output)
def generate_txt_output(channels):
"""生成TXT格式内容分组名称,频道名称,URL"""
txt_lines = []
for channel in channels:
# 提取分组名称
group_match = re.search(r'group-title="([^"]+)"', channel["meta"])
group = group_match.group(1) if group_match else "未知分组"
# 提取频道名称(最后一个逗号后的内容)
channel_name = re.split(r',(?![^"]*\"\,)', channel["meta"])[-1].strip()
# 清洗频道名称
for char in DELETE_CHARS:
channel_name = channel_name.replace(char, "")
# 添加条目
txt_lines.append(f"{group},{channel_name},{channel['url']}")
return "\n".join(txt_lines)
def save_file(content, filename):
"""通用文件保存函数"""
try:
with open(filename, "w", encoding="utf-8") as f:
f.write(content)
print(f"✅ 成功生成 {filename} 文件")
return True
except PermissionError:
print(f"❌ 无写入权限: {filename}")
except Exception as e:
print(f"❌ 保存文件失败: {str(e)}")
return False
def main():
"""主工作流程"""
all_channels = []
print("开始下载和处理数据源...")
@ -139,28 +153,20 @@ def main():
processed.append({"meta": cleaned_meta, "url": ch["url"]})
all_channels.extend(processed)
print(f"[✓] 成功处理 {source['name']} ({len(processed)} 频道)")
print(f"[✓] {source['name']} 处理完成 ({len(processed)} 频道)")
except Exception as e:
print(f"[×] 处理 {source['name']} 失败: {str(e)}")
print(f"[×] {source['name']} 失败: {str(e)}")
print("生成最终文件...")
final_content = generate_output(all_channels)
print("\n生成最终文件...")
# 生成M3U文件
m3u_content = generate_m3u_output(all_channels)
save_file(m3u_content, "live.m3u")
try:
with open("live.txt", "w", encoding="utf-8") as f:
f.write(final_content)
print("生成 live.txt 成功!")
except Exception as e:
print(f"生成 live.txt 失败: {str(e)}")
# 生成TXT文件
txt_content = generate_txt_output(all_channels)
save_file(txt_content, "live.txt")
try:
with open("live.m3u", "w", encoding="utf-8") as f:
f.write(final_content)
print("生成 live.m3u 成功!")
except Exception as e:
print(f"生成 live.m3u 失败: {str(e)}")
print(f"共处理 {len(all_channels)} 个频道,文件大小: {len(final_content)//1024}KB")
print(f"\n处理完成!有效频道总数: {len(all_channels)}")
if __name__ == "__main__":
main()