From 3f509cd977955fdf47dc0a555c7c69ec4a77d2bd Mon Sep 17 00:00:00 2001 From: alantang <107459091+alantang1977@users.noreply.github.com> Date: Fri, 11 Apr 2025 15:46:49 +0800 Subject: [PATCH] Update app.py --- app.py | 81 ++++++++++++++++------------------------------------------ 1 file changed, 22 insertions(+), 59 deletions(-) diff --git a/app.py b/app.py index 014a1805..037c8e32 100644 --- a/app.py +++ b/app.py @@ -2,8 +2,6 @@ import re import requests import os from collections import defaultdict -import time -from concurrent.futures import ThreadPoolExecutor # ================== 配置区域 ================== # 需要删除的分组 (这些分组下的频道会被过滤) @@ -27,11 +25,6 @@ M3U_SOURCES = [ {"name": "MyIPTV", "url": "https://git.gra.phite.ro/alantang/auto-iptv/raw/branch/main/live_ipv6.m3u", "ua": "okhttp/4.12.0"} ] -# 最大响应时间(秒),超过此时间的频道将被视为异常 -MAX_RESPONSE_TIME = 5 -# 线程池最大工作线程数 -MAX_WORKERS = 10 - # ================== 核心功能 ================== def robust_download(url, ua, max_retries=3): headers = {'User-Agent': ua} @@ -42,10 +35,12 @@ def robust_download(url, ua, max_retries=3): response.encoding = response.apparent_encoding return response.text except Exception as e: - if attempt == max_retries - 1: - raise - print(f"正在重试 {url} (第 {attempt + 1} 次)") + if attempt == max_retries - 1: raise + print(f"正在重试 {url} (第 {attempt+1} 次)") +def process_channel(line): + if any(f'group-title="{g}"' in line for g in DELETE_GROUPS): + return None def process_channel(line): """频道信息处理流水线""" @@ -70,13 +65,12 @@ def process_channel(line): # 清洗频道名称 for char in DELETE_CHARS: line = line.replace(char, "") - + # 标准化CCTV写法 line = re.sub(r'cctv-?', 'CCTV', line, flags=re.IGNORECASE) - + return line - def parse_m3u(content): channels = [] current_channel = {} @@ -90,32 +84,6 @@ def parse_m3u(content): current_channel = {} return channels - -def measure_response_time(url, ua): - headers = {'User-Agent': ua} - try: - start_time = time.time() - response = requests.head(url, headers=headers, timeout=MAX_RESPONSE_TIME) - response.raise_for_status() - return time.time() - start_time - except Exception: - return float('inf') - - -def sort_channels_by_response_time(channels): - def measure_and_sort(channel): - url = channel["url"] - ua = M3U_SOURCES[0]["ua"] # 假设所有源使用相同的UA - response_time = measure_response_time(url, ua) - return response_time, channel - - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - results = list(executor.map(measure_and_sort, channels)) - valid_results = [result for result in results if result[0] < MAX_RESPONSE_TIME] - valid_results.sort(key=lambda x: x[0]) - return [channel for _, channel in valid_results] - - def generate_m3u_output(channels): """生成M3U格式内容""" group_dict = defaultdict(list) @@ -133,13 +101,11 @@ def generate_m3u_output(channels): output = ["#EXTM3U"] for group, items in ordered_groups: - sorted_items = sort_channels_by_response_time(items) - for item in sorted_items: + for item in items: output.append(item["meta"]) output.append(item["url"]) return "\n".join(output) - def generate_txt_output(channels): """生成TXT格式内容(分组名称,频道名称,URL)""" txt_lines = [] @@ -147,20 +113,19 @@ def generate_txt_output(channels): # 提取分组名称 group_match = re.search(r'group-title="([^"]+)"', channel["meta"]) group = group_match.group(1) if group_match else "未知分组" - + # 提取频道名称(最后一个逗号后的内容) channel_name = re.split(r',(?![^"]*\"\,)', channel["meta"])[-1].strip() - + # 清洗频道名称 for char in DELETE_CHARS: channel_name = channel_name.replace(char, "") - + # 添加条目 txt_lines.append(f"{group},{channel_name},{channel['url']}") - + return "\n".join(txt_lines) - def save_file(content, filename): """通用文件保存函数""" # 创建 live 文件夹,如果不存在 @@ -169,47 +134,45 @@ def save_file(content, filename): os.makedirs(live_folder) file_path = os.path.join(live_folder, filename) try: - with open(file_path, "w", encoding="utf-8") as f: + with open(filename, "w", encoding="utf-8") as f: f.write(content) - print(f"✅ 成功生成 {file_path} 文件") + print(f"✅ 成功生成 {filename} 文件") return True except PermissionError: - print(f"❌ 无写入权限: {file_path}") + print(f"❌ 无写入权限: {filename}") except Exception as e: print(f"❌ 保存文件失败: {str(e)}") return False - def main(): all_channels = [] - + print("开始下载和处理数据源...") for source in M3U_SOURCES: try: content = robust_download(source["url"], source["ua"]) channels = parse_m3u(content) - + processed = [] for ch in channels: if cleaned_meta := process_channel(ch["meta"]): processed.append({"meta": cleaned_meta, "url": ch["url"]}) - + all_channels.extend(processed) print(f"[✓] {source['name']} 处理完成 ({len(processed)} 频道)") except Exception as e: print(f"[×] {source['name']} 失败: {str(e)}") - + print("\n生成最终文件...") # 生成M3U文件 m3u_content = generate_m3u_output(all_channels) save_file(m3u_content, "live.m3u") - + # 生成TXT文件 txt_content = generate_txt_output(all_channels) save_file(txt_content, "live.txt") - + print(f"\n处理完成!有效频道总数: {len(all_channels)}") - if __name__ == "__main__": - main() + main()