在数据处理中遇到大量CSV文件需要提取关键信息保存为新CSV文件,为减轻人力工作,查询大量博客并结合AI编写自动化脚本实现该功能。
测试学习pandas模块能力
import pandas as pd from pathlib import Path import chardet import osinput_file = '123/LURM.csv' #待处理文件#检测文件编码 with open(input_file, 'rb') as f:raw_data = f.read(10000) # 只读取前10000字节进行检测result = chardet.detect(raw_data)encoding = result['encoding']print(encoding)#导入文件 df = pd.read_csv(input_file, encoding=encoding) #print(df) print(f"原始数据列名: {list(df.columns)}")#提取指定列 columns_to_extract = ['vn', '报文类型', '采集时间', '上报时间', '车辆状态', '采集时间', '定位有效性', '纬度类型', '经度类型', '经度', '纬度'] extracted_df = df[columns_to_extract] print(extracted_df)#保存文件 output_file = 'extracted_result.csv' #extracted_df.to_csv(output_file, index=False, encoding='utf-8-sig') extracted_df.to_csv(output_file, index=False, encoding=encoding)
上面这段代码实现单文件信息提取并保存。要实现CSV文件处理需要使用到pandas模块,该模块是大数据文件处理较为热门模块。有编程基础的可以看注释,基本可以理解,如果没有建议使用AI工具指导。
正式处理程序
""" CSV文件提取保存程序代码第一版时间20250911 """import pandas as pd from pathlib import Path import chardet import osdef detect_encoding(file_path):"""检测文件编码"""try:with open(file_path, 'rb') as f:raw_data = f.read(10000) # 只读取前10000字节进行检测result = chardet.detect(raw_data)return result['encoding']except Exception as e:print(f"编码检测失败: {e}")return Nonedef read_csv_with_auto_encoding(file_path):"""自动检测编码并读取CSV文件"""# 常见的编码尝试顺序encodings_to_try = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'latin1', 'cp1252']# 首先尝试自动检测编码detected_encoding = detect_encoding(file_path)if detected_encoding:encodings_to_try.insert(0, detected_encoding)for encoding in encodings_to_try:try:df = pd.read_csv(file_path, encoding=encoding)print(f"✓ 成功使用 {encoding} 编码读取文件")return df, encodingexcept UnicodeDecodeError:continueexcept Exception as e:# 如果不是编码问题,可能是其他错误if 'codec' not in str(e).lower():raise e# 如果所有编码都失败,尝试使用错误处理try:df = pd.read_csv(file_path, encoding='utf-8', errors='ignore')print("✓ 使用utf-8编码并忽略错误字符读取文件")return df, 'utf-8-with-errors-ignored'except Exception as e:raise Exception(f"所有编码尝试都失败: {e}") def save_dataframe_to_csv(extracted_df, file_name, encoding_used, output_folder="output"):"""保存提取后的DataFrame到CSV文件参数:extracted_df (pd.DataFrame): 要保存的数据框file_name (str): 原始文件名(不含扩展名)encoding_used (str): 使用的编码格式output_folder (str): 输出文件夹名,默认为"output"返回:bool: 保存成功返回True,失败返回False"""try:# 创建输出文件夹(如果不存在)output_path = Path(output_folder)output_path.mkdir(exist_ok=True)# 构建输出文件名:原文件名 + "o" + .csvoutput_file = output_path / f"{file_name}_o.csv"# 检查文件是否已存在if output_file.exists():raise FileExistsError(f"文件已存在: {output_file}")# 保存为CSV文件extracted_df.to_csv(output_file, index=False, encoding=encoding_used)print(f"✓ 数据已成功保存到: {output_file}")print(f" 使用编码: {encoding_used}")print(f" 数据形状: {extracted_df.shape[0]} 行, {extracted_df.shape[1]} 列")return Trueexcept FileExistsError as e:print(f"⚠️ 文件已存在错误: {e}")return Falseexcept Exception as e:print(f"❌ 保存文件失败: {e}")return Falsedef read_all_csv_files(folder_path):"""读取指定文件夹下的所有CSV文件(支持自动编码检测)"""if not os.path.exists(folder_path):print(f"错误: 文件夹 '{folder_path}' 不存在")return {}csv_files = list(Path(folder_path).glob("*.csv"))if len(csv_files) == 0:print(f"警告: 在 '{folder_path}' 中没有找到CSV文件")return {}dataframes = {}print(f"找到 {len(csv_files)} 个CSV文件:")print("-" * 50)for csv_file in csv_files:try:print(f"\n正在处理: {csv_file.name}")df, encoding_used = read_csv_with_auto_encoding(csv_file)print(f"✓ 成功读取: {csv_file.name}")print(f" 编码: {encoding_used}")print(f" 数据形状: {df.shape[0]} 行, {df.shape[1]} 列")print("-" * 50)print(f"原始数据列名: {list(df.columns)}")columns_to_extract = ['vn', '报文类型', '采集时间', '上报时间', '车辆状态', '采集时间', '定位有效性', '纬度类型', '经度类型', '经度', '纬度']extracted_df = df[columns_to_extract]#print(extracted_df)# 使用文件名(不含路径和扩展名)作为键file_name = csv_file.stemdataframes[file_name] = df#保存提取后信息到文件save_success = save_dataframe_to_csv(extracted_df, file_name, encoding_used)if save_success:print("✓ 提取的数据已成功保存")else:print("⚠️ 提取的数据保存失败")except Exception as e:print(f"✗ 无法读取 {csv_file.name}: {str(e)}")return dataframes# 主程序 if __name__ == "__main__":# 设置你的文件夹路径folder_path = "C:/Users/Downloads/LURM"# 读取所有CSV文件all_data = read_all_csv_files(folder_path)print(f"\n{'='*50}")print(f"总结: 成功读取了 {len(all_data)} 个CSV文件")# 显示每个文件的基本信息# for name, df in all_data.items():# print(f"\n{name}:")# print(f" 形状: {df.shape}")# if len(df.columns) > 0:# print(f" 列名: {list(df.columns)[:5]}{'...' if len(df.columns) > 5 else ''}")# if len(df) > 0:# print(f" 数据预览:")# print(df.head(2))
正式程序的核心代码和测试程序一样,只不过添加大量错误检测和现实文件信息提取的代码(该段代码已注释)。
代码写的比较烂,勿喷,有问题请回复指出。