diff --git a/app.py b/app.py index 862e8704..0f0fcf0f 100644 --- a/app.py +++ b/app.py @@ -2,6 +2,7 @@ import re import requests import os from collections import defaultdict +import time # ================== 配置区域 ================== # 需要删除的分组 (这些分组下的频道会被过滤) @@ -14,7 +15,7 @@ GROUP_ORDER = ["收藏频道", "央视频道", "卫视频道", "其他频道", " M3U_SOURCES = [ {"name": "aktv", "url": "https://git.gra.phite.ro/alantang/tvbs/raw/branch/main/output/result.m3u", "ua": "okhttp/4.12.0"}, - {"name": "mytv", "url": "https://codeberg.org/sy147258/iptv/raw/branch/main/电视", "ua": "okhttp/4.12.0"}, + {"name": "mytv", "url": "https://codeberg.org/sy147258/iptv/raw/branch/main/电视", "ua": "okhttp/4.12.0"}, {"name": "自用收藏", "url": "http://aktv.space/live.m3u", "ua": "okhttp/4.12.0"}, {"name": "big", "url": "https://git.gra.phite.ro/alantang/auto-iptv/raw/branch/main/live_ipv4.m3u", "ua": "okhttp/4.12.0"}, {"name": "xhztv", "url": "http://xhztv.top/new.txt", "ua": "okhttp/4.12.0"}, @@ -25,6 +26,9 @@ M3U_SOURCES = [ {"name": "MyIPTV", "url": "https://git.gra.phite.ro/alantang/auto-iptv/raw/branch/main/live_ipv6.m3u", "ua": "okhttp/4.12.0"} ] +# 最大响应时间(秒),超过此时间的频道将被视为异常 +MAX_RESPONSE_TIME = 5 + # ================== 核心功能 ================== def robust_download(url, ua, max_retries=3): headers = {'User-Agent': ua} @@ -35,12 +39,10 @@ def robust_download(url, ua, max_retries=3): response.encoding = response.apparent_encoding return response.text except Exception as e: - if attempt == max_retries - 1: raise - print(f"正在重试 {url} (第 {attempt+1} 次)") + if attempt == max_retries - 1: + raise + print(f"正在重试 {url} (第 {attempt + 1} 次)") -def process_channel(line): - if any(f'group-title="{g}"' in line for g in DELETE_GROUPS): - return None def process_channel(line): """频道信息处理流水线""" @@ -65,12 +67,13 @@ def process_channel(line): # 清洗频道名称 for char in DELETE_CHARS: line = line.replace(char, "") - + # 标准化CCTV写法 line = re.sub(r'cctv-?', 'CCTV', line, flags=re.IGNORECASE) - + return line + def parse_m3u(content): channels = [] current_channel = {} @@ -84,6 +87,30 @@ def parse_m3u(content): current_channel = {} return channels + +def measure_response_time(url, ua): + headers = {'User-Agent': ua} + try: + start_time = time.time() + response = requests.head(url, headers=headers, timeout=MAX_RESPONSE_TIME) + response.raise_for_status() + return time.time() - start_time + except Exception: + return float('inf') + + +def sort_channels_by_response_time(channels): + sorted_channels = [] + for channel in channels: + url = channel["url"] + ua = M3U_SOURCES[0]["ua"] # 假设所有源使用相同的UA + response_time = measure_response_time(url, ua) + if response_time < MAX_RESPONSE_TIME: + sorted_channels.append((response_time, channel)) + sorted_channels.sort(key=lambda x: x[0]) + return [channel for _, channel in sorted_channels] + + def generate_m3u_output(channels): """生成M3U格式内容""" group_dict = defaultdict(list) @@ -101,11 +128,13 @@ def generate_m3u_output(channels): output = ["#EXTM3U"] for group, items in ordered_groups: - for item in items: + sorted_items = sort_channels_by_response_time(items) + for item in sorted_items: output.append(item["meta"]) output.append(item["url"]) return "\n".join(output) + def generate_txt_output(channels): """生成TXT格式内容(分组名称,频道名称,URL)""" txt_lines = [] @@ -113,61 +142,68 @@ def generate_txt_output(channels): # 提取分组名称 group_match = re.search(r'group-title="([^"]+)"', channel["meta"]) group = group_match.group(1) if group_match else "未知分组" - + # 提取频道名称(最后一个逗号后的内容) channel_name = re.split(r',(?![^"]*\"\,)', channel["meta"])[-1].strip() - + # 清洗频道名称 for char in DELETE_CHARS: channel_name = channel_name.replace(char, "") - + # 添加条目 txt_lines.append(f"{group},{channel_name},{channel['url']}") - + return "\n".join(txt_lines) + def save_file(content, filename): """通用文件保存函数""" + # 创建 live 文件夹,如果不存在 + if not os.path.exists('live'): + os.makedirs('live') try: - with open(filename, "w", encoding="utf-8") as f: + file_path = os.path.join('live', filename) + with open(file_path, "w", encoding="utf-8") as f: f.write(content) - print(f"✅ 成功生成 {filename} 文件") + print(f"✅ 成功生成 {file_path} 文件") return True except PermissionError: - print(f"❌ 无写入权限: {filename}") + print(f"❌ 无写入权限: {file_path}") except Exception as e: print(f"❌ 保存文件失败: {str(e)}") return False + def main(): all_channels = [] - + print("开始下载和处理数据源...") for source in M3U_SOURCES: try: content = robust_download(source["url"], source["ua"]) channels = parse_m3u(content) - + processed = [] for ch in channels: if cleaned_meta := process_channel(ch["meta"]): processed.append({"meta": cleaned_meta, "url": ch["url"]}) - + all_channels.extend(processed) print(f"[✓] {source['name']} 处理完成 ({len(processed)} 频道)") except Exception as e: print(f"[×] {source['name']} 失败: {str(e)}") - + print("\n生成最终文件...") # 生成M3U文件 m3u_content = generate_m3u_output(all_channels) save_file(m3u_content, "live.m3u") - + # 生成TXT文件 txt_content = generate_txt_output(all_channels) save_file(txt_content, "live.txt") - + print(f"\n处理完成!有效频道总数: {len(all_channels)}") + if __name__ == "__main__": main()