Python自动化从入门到实战(19) 小白也能操作的CSV文件批量合并教程

在日常工作中,我们经常会遇到需要合并多个CSV文件的情况。手动复制粘贴不仅效率低下,还容易出错。今天我就来教大家如何使用一个简单的Python脚本来自动化完成这项工作,让你的数据处理效率提升10倍!

完整代码下载地址

下载地址

合并前

1班.csv
在这里插入图片描述
2班.csv
在这里插入图片描述

合并后的效果:1班,2班,3班的数据合并为一个csv文件

在这里插入图片描述

一、什么是CSV文件?

CSV(Comma-Separated Values)是一种简单的文本文件格式,用于存储表格数据。它的每一行代表一条记录,字段之间用逗号分隔。CSV文件可以用Excel、记事本等软件打开,是数据交换的常用格式。

二、为什么需要合并CSV文件?

在实际工作中,我们可能会遇到这样的场景:

  • 从不同系统导出的相同格式数据需要汇总分析
  • 按时间段分割的日志文件需要合并查看
  • 多个部门提供的同类数据表格需要统一处理

手动合并这些文件不仅费时费力,还容易出现格式错误或数据丢失。使用自动化工具可以完美解决这些问题!

三、准备工作

1. 安装Python

这个脚本需要Python环境运行。如果你还没有安装Python,可以按照以下步骤操作:

  1. 访问 Python官网
  2. 下载最新版本的Python安装包
  3. 安装时勾选"Add Python to PATH"选项
  4. 完成安装后,打开命令提示符,输入python --version验证安装成功

2. 安装必要的库

我们的脚本需要使用pandas库来处理CSV文件。安装方法很简单:

  1. 打开命令提示符
  2. 输入以下命令:
pip install pandas

稍等片刻,pandas库就会安装完成。

四、脚本工作原理

我们的CSV合并脚本具有以下强大功能:

1. 智能编码识别

脚本会自动尝试多种常见的文件编码格式(如UTF-8、GBK等),确保能正确读取各种来源的CSV文件。

2. 空白行过滤

脚本会自动删除完全空白的行和只有空字符串的行,保证合并后的文件干净整洁。

3. 标题行处理

智能保留第一个文件的标题行,避免重复的标题在合并后出现。

4. 源文件追踪

为每条数据添加源文件路径信息,方便后续追踪数据来源。

5. 详细日志记录

记录整个合并过程的详细信息,包括处理了多少文件、合并了多少行数据等。

五、代码解析

下面我们来详细解析csv合并.py脚本的核心代码,让大家了解它是如何工作的:

1. 导入必要的库

import pandas as pd
import os
import glob
import logging
from datetime import datetime

这部分代码导入了脚本需要的所有库:

  • pandas:用于处理CSV文件和数据操作
  • os:用于文件路径和目录操作
  • glob:用于查找文件
  • logging:用于日志记录
  • datetime:用于获取当前日期生成日志文件名

2. 配置日志系统

# 确保logs文件夹存在
log_dir = os.path.join(os.path.dirname(__file__), 'logs')
os.makedirs(log_dir, exist_ok=True)

# 获取当前日期,格式为YYYYMMDD
current_date = datetime.now().strftime('%Y%m%d')
log_file = os.path.join(log_dir, f'{current_date}.log')

# 配置日志,输出到文件和控制台
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file, encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

这段代码负责配置日志系统:

  • 首先创建logs文件夹(如果不存在)
  • 生成以当前日期命名的日志文件名(如20251013.log
  • 配置日志同时输出到文件和控制台,方便查看
  • 设置日志格式包含时间、日志级别和消息内容

3. 设置文件路径

# 设置文件路径
source_dir = os.path.join(os.path.dirname(__file__), 'source')
output_file = os.path.join(os.path.dirname(__file__), 'csv_all.csv')

# 常见编码列表
COMMON_ENCODINGS = ['utf-8', 'utf-8-sig', 'gbk', 'latin1', 'cp1252']

这里定义了:

  • source_dir:存放源CSV文件的文件夹路径
  • output_file:合并后的输出文件路径
  • COMMON_ENCODINGS:常见的文件编码列表,用于自动尝试解码文件

4. 核心合并函数

def merge_csv_files():
    """合并CSV文件,只保留第一个文件的标题,去除空白行"""
    try:
        # 获取所有CSV文件
        csv_files = glob.glob(os.path.join(source_dir, '*.csv'))
        if not csv_files:
            logger.error(f"在目录 {source_dir} 中未找到CSV文件")
            return False
        
        logger.info(f"找到 {len(csv_files)} 个CSV文件")
        
        all_data = []
        first_file = True
        
        for i, file_path in enumerate(csv_files):
            file_name = os.path.basename(file_path)
            logger.info(f"处理文件 {i+1}/{len(csv_files)}: {file_name}")

这是合并函数的开始部分:

  • 使用glob查找所有CSV文件
  • 检查是否找到文件,如果没有则记录错误并返回
  • 创建all_data列表用于存储所有文件的数据
  • first_file标志用于跟踪是否是第一个文件(用于保留标题行)

5. 智能编码识别

            # 尝试使用不同的编码读取文件
            df = None
            for encoding in COMMON_ENCODINGS:
                try:
                    # 读取CSV文件
                    df = pd.read_csv(file_path, encoding=encoding)
                    logger.info(f"  - 使用编码 {encoding} 成功读取文件")
                    break
                except Exception:
                    continue
            
            if df is None:
                logger.error(f"  - 无法读取文件 {file_name},所有编码尝试都失败")
                continue

这部分实现了智能编码识别:

  • 依次尝试COMMON_ENCODINGS列表中的每种编码
  • 使用try-except捕获可能的解码错误
  • 一旦找到能成功读取的编码,就使用该编码并跳出循环
  • 如果所有编码都失败,则记录错误并跳过该文件

6. 空白行过滤

            # 移除完全空白的行
            df = df.dropna(how='all')
            
            # 移除只有空字符串的行
            df = df[df.apply(lambda x: x.astype(str).str.strip().astype(bool).any(), axis=1)]

这段代码实现了空白行过滤功能:

  • 第一行移除所有列都是NaN(空值)的行
  • 第二行移除那些虽然不是NaN但只包含空字符串(如空格、制表符等)的行
  • 使用了apply函数和lambda表达式对每一行进行处理

7. 标题行处理

            # 如果是第一个文件,保留标题行
            # 否则,跳过标题行(假设所有文件都有相同的标题)
            if not first_file and not df.empty:
                df = df.iloc[1:]
            else:
                first_file = False

这段代码处理标题行:

  • 对于第一个文件,保留其标题行
  • 对于后续文件,跳过第一行(即标题行),避免重复的标题
  • 使用iloc[1:]来选择从第二行开始的数据

8. 添加源文件路径

            # 为当前文件数据添加FileName列,记录文件绝对路径
            if not df.empty:
                # 获取文件的绝对路径
                absolute_path = os.path.abspath(file_path)
                # 添加FileName列
                df['FileName'] = absolute_path
                all_data.append(df)
                logger.info(f"  - 文件 {file_name} 添加了 {len(df)} 行数据,并添加了文件路径列")
            else:
                logger.warning(f"  - 文件 {file_name} 没有有效数据")

这部分代码为数据添加源文件路径信息:

  • 使用os.path.abspath()获取文件的绝对路径
  • 为DataFrame添加一个新的列FileName,值为文件的绝对路径
  • 将处理好的数据添加到all_data列表中
  • 记录添加的数据行数

9. 合并数据并保存

        if all_data:
            # 合并所有数据
            merged_df = pd.concat(all_data, ignore_index=True)
            
            # 再次移除完全空白的行(以防合并过程中产生)
            merged_df = merged_df.dropna(how='all')
            
            # 保存合并后的文件,确保有写入权限
            try:
                merged_df.to_csv(output_file, index=False, encoding='utf-8-sig')
                logger.info(f"CSV文件合并成功!")
                logger.info(f"- 处理文件数: {len(csv_files)}")
                logger.info(f"- 合并后数据行数: {len(merged_df)}")
                logger.info(f"- 输出文件: {output_file}")
                return True
            except PermissionError:
                logger.error(f"[Errno 13] 没有写入权限: '{output_file}'")
                logger.error("请确保没有其他程序正在使用该文件,并且您有写入权限。")
                return False
        else:
            logger.error("没有成功合并任何数据")
            return False
            
    except Exception as e:
        logger.error(f"合并CSV文件时发生错误: {e}")
        return False

这是函数的最后部分,负责合并数据和保存文件:

  • 使用pd.concat()合并所有DataFrame
  • ignore_index=True确保生成连续的行索引
  • 再次过滤空白行,确保数据质量
  • 使用try-except捕获可能的权限错误
  • 保存文件时使用utf-8-sig编码,确保中文正常显示
  • 记录详细的成功信息或错误信息

10. 主程序入口

if __name__ == "__main__":
    print("开始合并CSV文件...")
    success = merge_csv_files()
    if success:
        print(f"CSV文件合并成功!")
    else:
        print("CSV文件合并失败,请查看日志获取详细信息。")
    # 添加按Enter键退出,便于用户查看结果
    input("按Enter键退出...")

这是脚本的主入口:

  • 使用if __name__ == "__main__":确保只有直接运行脚本时才执行以下代码
  • 打印开始信息
  • 调用merge_csv_files()函数执行合并操作
  • 根据函数返回结果显示成功或失败信息
  • 添加input()函数让程序在结束前等待用户按键,方便查看结果

六、使用方法

1. 创建工作目录结构

首先,我们需要创建以下目录结构:

csv_combine/
│  csv合并.py    # 我们的合并脚本
│  csv_all.csv   # 合并后的输出文件
└── source/      # 存放需要合并的CSV文件

2. 准备CSV文件

将所有需要合并的CSV文件复制到source文件夹中。请确保这些文件具有相同的列结构(即相同的标题行)。

3. 运行脚本

有两种方式可以运行这个脚本:

方法一:双击运行

直接双击csv合并.py文件即可运行脚本。运行完成后,会在屏幕上显示成功信息。

方法二:命令行运行

  1. 打开命令提示符
  2. 切换到csv_combine目录
  3. 输入以下命令:
python csv合并.py

4. 查看合并结果

脚本运行成功后,会在csv_combine目录下生成一个名为csv_all.csv的文件,这就是合并后的结果文件。你可以用Excel或其他软件打开查看。

七、实战案例

案例:合并多个标签数据表

假设我们有28个设备标签数据表,分别存储在不同的CSV文件中(AI.csv、M1.csv、M2.csv…V9.csv)。这些文件包含相同的列:Tag Name、Address、Data Type等。

  1. 将这28个CSV文件复制到source文件夹
  2. 运行csv合并.py脚本
  3. 脚本会自动:
    • 读取所有28个CSV文件
    • 只保留第一个文件的标题行
    • 过滤掉所有空白行
    • 为每行数据添加源文件路径信息
    • 将285行有效数据合并到一个文件中

整个过程只需要几秒钟,比手动合并效率提升百倍!

八、常见问题解答

Q1: 运行时提示缺少pandas库怎么办?

A: 请按照准备工作中的步骤,使用pip安装pandas库。

Q2: 合并后的文件出现乱码怎么办?

A: 这是因为文件编码问题。我们的脚本已经内置了多种编码尝试机制,一般情况下能自动解决这个问题。如果仍然出现乱码,可以尝试用Excel打开后另存为UTF-8格式。

Q3: 如何确保所有文件都有相同的列结构?

A: 在合并前,请检查所有CSV文件的第一行(标题行)是否完全一致。如果不一致,合并后的文件可能会出现列错位的情况。

Q4: 脚本支持合并大量CSV文件吗?

A: 是的,脚本可以轻松处理几十甚至上百个CSV文件。处理速度主要取决于文件大小和计算机性能。

九、进阶功能介绍

1. 查看详细日志

脚本在运行过程中会输出详细的日志信息,包括:

  • 找到的文件数量
  • 每个文件的处理情况
  • 使用的编码格式
  • 添加的数据行数
  • 最终合并的总行数

通过这些日志,你可以清楚地了解整个合并过程。

2. 数据追踪功能

合并后的文件中包含了一个FileName列,记录了每行数据来自哪个源文件的绝对路径。这个功能对于数据溯源非常有用,特别是当你需要验证或追踪数据来源时。

十一、总结

通过本文介绍的CSV合并脚本,你可以轻松实现多个CSV文件的自动合并,大大提高工作效率。这个脚本具有以下优点:

✅ 操作简单,双击即可运行
✅ 智能处理各种编码格式
✅ 自动过滤空白行
✅ 保留正确的标题行
✅ 添加源文件追踪信息
✅ 详细的日志记录,按日期保存

无论是数据分析、报表生成还是数据迁移,这个工具都能为你节省大量宝贵的时间。通过代码解析部分,你还可以了解脚本的工作原理,甚至可以根据自己的需求进行定制修改!


完整代码下载地址

下载地址

小贴士:如果你需要处理大量CSV文件,建议定期清理source文件夹,只保留当前需要合并的文件,这样可以提高脚本的运行效率。

Logo

葡萄城是专业的软件开发技术和低代码平台提供商,聚焦软件开发技术,以“赋能开发者”为使命,致力于通过表格控件、低代码和BI等各类软件开发工具和服务

更多推荐