diff --git a/app.py b/app.py index 3e6f2105..32997127 100644 --- a/app.py +++ b/app.py @@ -6,25 +6,12 @@ from collections import defaultdict # ================== 配置区域 ================== # 需要删除的分组 (这些分组下的频道会被过滤) DELETE_GROUPS = ["4K频道", "8K频道"] - -# 需要转换为地方频道的省份 PROVINCE_GROUPS = ["北京", "安徽", "甘肃", "广东", "贵州", "海南", "河北", "河南", "黑龙江", "湖北", "湖南", "吉林", "江苏", "江西", "辽宁", "青海", "山东", "上海", "四川", "云南", "浙江", "重庆", "香港"] - -# 分组名称替换规则 -GROUP_REPLACEMENTS = { - "央视": "央视频道", - "卫视": "卫视频道", - "其他": "其他频道" -} - -# 需要删除的频道名称冗余字符 +GROUP_REPLACEMENTS = {"央视": "央视频道", "卫视": "卫视频道", "其他": "其他频道"} DELETE_CHARS = ["iHOT-", "NewTV-", "SiTV-", "-HEVC", "-50-FPS", "-高码", "-4K", "-BPTV", "咪咕视频_8M1080_"] - -# 最终分组排序规则 GROUP_ORDER = ["收藏频道", "央视频道", "卫视频道", "其他频道", "地方频道"] -# 数据源配置 (包含重试机制) M3U_SOURCES = [ {"name": "aktv", "url": "https://gh.tryxd.cn/https://raw.githubusercontent.com/alantang1977/JunTV/main/output/result.m3u", "ua": "okhttp/4.12.0"}, {"name": "自用收藏", "url": "http://aktv.space/live.m3u", "ua": "okhttp/4.12.0"}, @@ -39,19 +26,21 @@ M3U_SOURCES = [ # ================== 核心功能 ================== def robust_download(url, ua, max_retries=3): - """带重试机制的下载函数""" headers = {'User-Agent': ua} for attempt in range(max_retries): try: response = requests.get(url, headers=headers, timeout=15) response.raise_for_status() - response.encoding = response.apparent_encoding # 自动检测编码 + response.encoding = response.apparent_encoding return response.text except Exception as e: - if attempt == max_retries - 1: - raise + if attempt == max_retries - 1: raise print(f"正在重试 {url} (第 {attempt+1} 次)") +def process_channel(line): + if any(f'group-title="{g}"' in line for g in DELETE_GROUPS): + return None + def process_channel(line): """频道信息处理流水线""" # 过滤不需要的分组 @@ -82,10 +71,8 @@ def process_channel(line): return line def parse_m3u(content): - """解析M3U内容并结构化存储""" channels = [] current_channel = {} - for line in content.splitlines(): line = line.strip() if line.startswith("#EXTINF"): @@ -96,26 +83,21 @@ def parse_m3u(content): current_channel = {} return channels -def generate_output(channels): - """生成排序后的最终内容""" - # 按分组归类 +def generate_m3u_output(channels): + """生成M3U格式内容""" group_dict = defaultdict(list) for channel in channels: if match := re.search(r'group-title="([^"]+)"', channel["meta"]): group = match.group(1) group_dict[group].append(channel) - # 按自定义顺序排序 ordered_groups = [] for group in GROUP_ORDER: if group in group_dict: ordered_groups.append((group, group_dict.pop(group))) - - # 添加剩余分组并按字母排序 for group in sorted(group_dict.keys()): ordered_groups.append((group, group_dict[group])) - # 生成最终文本 output = ["#EXTM3U"] for group, items in ordered_groups: for item in items: @@ -123,8 +105,40 @@ def generate_output(channels): output.append(item["url"]) return "\n".join(output) +def generate_txt_output(channels): + """生成TXT格式内容(分组名称,频道名称,URL)""" + txt_lines = [] + for channel in channels: + # 提取分组名称 + group_match = re.search(r'group-title="([^"]+)"', channel["meta"]) + group = group_match.group(1) if group_match else "未知分组" + + # 提取频道名称(最后一个逗号后的内容) + channel_name = re.split(r',(?![^"]*\"\,)', channel["meta"])[-1].strip() + + # 清洗频道名称 + for char in DELETE_CHARS: + channel_name = channel_name.replace(char, "") + + # 添加条目 + txt_lines.append(f"{group},{channel_name},{channel['url']}") + + return "\n".join(txt_lines) + +def save_file(content, filename): + """通用文件保存函数""" + try: + with open(filename, "w", encoding="utf-8") as f: + f.write(content) + print(f"✅ 成功生成 {filename} 文件") + return True + except PermissionError: + print(f"❌ 无写入权限: {filename}") + except Exception as e: + print(f"❌ 保存文件失败: {str(e)}") + return False + def main(): - """主工作流程""" all_channels = [] print("开始下载和处理数据源...") @@ -139,28 +153,20 @@ def main(): processed.append({"meta": cleaned_meta, "url": ch["url"]}) all_channels.extend(processed) - print(f"[✓] 成功处理 {source['name']} ({len(processed)} 个频道)") + print(f"[✓] {source['name']} 处理完成 ({len(processed)} 频道)") except Exception as e: - print(f"[×] 处理 {source['name']} 失败: {str(e)}") + print(f"[×] {source['name']} 失败: {str(e)}") - print("生成最终文件...") - final_content = generate_output(all_channels) + print("\n生成最终文件...") + # 生成M3U文件 + m3u_content = generate_m3u_output(all_channels) + save_file(m3u_content, "live.m3u") - try: - with open("live.txt", "w", encoding="utf-8") as f: - f.write(final_content) - print("生成 live.txt 成功!") - except Exception as e: - print(f"生成 live.txt 失败: {str(e)}") + # 生成TXT文件 + txt_content = generate_txt_output(all_channels) + save_file(txt_content, "live.txt") - try: - with open("live.m3u", "w", encoding="utf-8") as f: - f.write(final_content) - print("生成 live.m3u 成功!") - except Exception as e: - print(f"生成 live.m3u 失败: {str(e)}") - - print(f"共处理 {len(all_channels)} 个频道,文件大小: {len(final_content)//1024}KB") + print(f"\n处理完成!有效频道总数: {len(all_channels)}") if __name__ == "__main__": main()