DataFileMeta 

可以把它理解为 Paimon 表中每一个数据文件(Data File)的“身份证”。它详细记录了一个数据文件的所有关键信息,这些信息被用于查询优化、MVCC(多版本并发控制)、Compaction(合并)等核心功能。

DataFileMeta 的核心作用是描述一个数据文件的静态属性。Paimon 将一张表的数据存储在多个文件中(例如 Parquet、ORC 格式),DataFileMeta 就是这些文件在 Paimon 系统中的元数据表示。

其设计思想是不可变性(Immutability)。你会发现 DataFileMeta 的所有字段都是 final 的,并且没有 setter 方法。任何对 DataFileMeta 的修改(如 upgraderename)都会返回一个新的 DataFileMeta 对象,而不是修改自身。这种设计在分布式和并发环境中能极大地简化状态管理,保证线程安全。

核心字段(属性)分析

DataFileMeta 的字段可以分为几类,我们逐一来看。

// ...
public class DataFileMeta {
    // ...
    private final String fileName;
    private final long fileSize;
    // total number of rows (including add & delete) in this file
    private final long rowCount;
    private final @Nullable Long deleteRowCount;

    private final BinaryRow minKey;
    private final BinaryRow maxKey;
    private final SimpleStats keyStats;
    private final SimpleStats valueStats;

    private final long minSequenceNumber;
    private final long maxSequenceNumber;
    private final long schemaId;
    private final int level;

    private final List<String> extraFiles;
    private final Timestamp creationTime;
    private final @Nullable byte[] embeddedIndex;
    private final @Nullable FileSource fileSource;
    private final @Nullable String externalPath;
    // ...
}
a. 基础信息
  • fileName: 文件名。Paimon 会根据它定位到物理存储上的文件。
  • fileSize: 文件大小(字节)。可用于任务切分、成本估算等。
  • rowCount: 文件中的总行数。
  • deleteRowCount: 文件中标记为删除的行数。Paimon 的 Changelog-with-upsert 表支持 DELETE 操作,这个字段用于记录删除的行数。addRowCount 可以通过 rowCount - deleteRowCount 计算得出。
b. 统计信息(用于查询优化)
  • minKey / maxKey: 文件中主键的最小值和最大值。这是查询优化的关键。例如,当查询条件为 pk = 100 时,如果某个文件的 minKey 是 101,maxKey 是 200,Paimon 就可以直接跳过扫描这个文件。
  • keyStats / valueStats: 更详细的列统计信息,由 SimpleStats 类表示,通常包含列的 min/max 值和 null 计数。这使得 Paimon 可以对非主键列也进行谓词下推(Predicate Pushdown),进一步减少需要扫描的数据量。
c. MVCC 和 LSM-Tree 信息
  • minSequenceNumber / maxSequenceNumber: 文件中记录的最小和最大序列号。SequenceNumber 是 Paimon 实现 MVCC 的核心,每次提交(Commit)都会生成一个新的、递增的序列号。查询时,Paimon 会根据指定的 snapshot(对应一个 SequenceNumber)来决定哪些文件是可见的。
  • schemaId: 该文件使用的数据模式(Schema)的 ID。Paimon 支持 Schema Evolution(模式演进),这个字段确保了在读取旧文件时能找到正确的 Schema 来解析数据。
  • level: 文件在 LSM-Tree(Log-Structured Merge-Tree)结构中所处的层级。Paimon 使用 LSM 结构来优化写入。level = 0 的文件是新写入的,通常较小且数量多。后台的 Compaction 任务会不断将低层级的文件合并(merge)成高层级的、更大的文件,以优化读取性能。upgrade() 方法就是用于在文件被合并到更高层级时更新这个元信息。
d. 其他辅助信息
  • extraFiles: 额外关联的文件列表。早期版本用于存储 changelog 文件,现在用途较少。
  • creationTime: 文件的创建时间戳。
  • embeddedIndex: 内嵌的文件索引。对于小的索引(如 Bloom Filter),可以直接存储在元数据中,避免一次额外的 I/O。
  • fileSource: 文件的来源信息,用于追踪数据血缘。
  • externalPath: 文件的外部路径。如果该字段不为 null,表示文件存储在默认仓库路径之外。

序列化 Schema (SCHEMA)

DataFileMeta 对象本身是存在于 JVM 内存中的。为了持久化,Paimon 需要将它写入到 manifest 文件中。SCHEMA 字段就定义了 DataFileMeta 对象和持久化格式(BinaryRow)之间的映射关系。

// ...
public class DataFileMeta {

    public static final RowType SCHEMA =
            new RowType(
                    false,
                    Arrays.asList(
                            new DataField(0, "_FILE_NAME", newStringType(false)),
                            new DataField(1, "_FILE_SIZE", new BigIntType(false)),
                            new DataField(2, "_ROW_COUNT", new BigIntType(false)),
                            new DataField(3, "_MIN_KEY", newBytesType(false)),
                            new DataField(4, "_MAX_KEY", newBytesType(false)),
                            new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA),
                            new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA),
                            new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)),
                            new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)),
                            new DataField(9, "_SCHEMA_ID", new BigIntType(false)),
                            new DataField(10, "_LEVEL", new IntType(false)),
                            // ... 其他字段
                            ));
    // ...
}

这个 SCHEMA 定义了 DataFileMeta 在 manifest 文件中的存储结构,每个 DataField 对应一个类的属性。这使得 Paimon 可以像读写普通表数据一样来读写元数据。

构造函数与核心方法

  • 构造函数DataFileMeta 提供了多个重载的构造函数,以方便在不同场景下创建实例。例如,forAppend() 工厂方法用于创建一个适用于 append-only 表的文件元数据,此时 minKeymaxKey 等字段会使用默认的空值。
  • 核心方法:
    • upgrade(int newLevel): 当文件被 Compaction 到更高层级时调用,返回一个 level 更新后的新 DataFileMeta 对象。
    • rename(String newFileName): 重命名文件时调用,返回一个 fileName 和 externalPath 更新后的新对象。
    • addRowCount() / deleteRowCount(): 计算并返回 Optional<Long> 类型的新增行数和删除行数。
    • fileFormat(): 从文件名中解析出文件格式(如 orcparquet)。
    • toString() / equals() / hashCode(): 标准的 Java 方法,用于调试、日志打印和在集合中进行比较。

总结

DataFileMeta 是 Paimon 数据湖存储引擎的基石之一。它通过一个不可变的、结构化的对象,精确描述了底层数据文件的全貌。这些丰富的元信息是 Paimon 实现高性能查询(谓词下推)、ACID 事务(通过 MVCC)和后台自动优化(LSM Compaction)等高级功能的技术保障。理解 DataFileMeta 的结构和作用,是深入理解 Paimon 工作原理的关键一步。

ManifestEntry

ManifestEntry  代表了清单文件(manifest file)中的一个条目。每个 ManifestEntry 对象都记录了一次对数据文件的原子操作,这个操作可以是添加一个文件,也可以是删除一个文件。

简单来说,ManifestEntry 的作用就是跟踪数据文件的变化ManifestEntry 包含以下关键信息:

  • kind: 表示这个条目的类型,是 ADD(添加)还是 DELETE(删除)。
  • partition: 数据文件所属的分区信息。
  • bucket: 数据文件所属的桶(bucket)的 ID。
  • totalBuckets: 总共有多少个桶。
  • file: 一个 DataFileMeta 对象,包含了被添加或删除的数据文件的详细元数据,例如:
    • 文件名
    • 文件大小
    • 数据行数
    • key的范围
    • 等等

ManifestEntry 的序列化和反序列化主要由 ManifestEntrySerializer 类来完成。它将一个 ManifestEntry Java 对象转换成 Paimon 内部的行存格式(InternalRow),DataFileMeta 对象作为其内嵌的一个字段也被序列化。

ManifestEntry 的序列化入口

在 ManifestEntry.java 中,提供了 toBytes() 方法作为序列化的入口,它通过一个 ThreadLocal 的 ManifestEntrySerializer 实例来执行序列化操作。

// ... existing code ...
    // ----------------------- Serialization -----------------------------

    private static final ThreadLocal<ManifestEntrySerializer> SERIALIZER_THREAD_LOCAL =
            ThreadLocal.withInitial(ManifestEntrySerializer::new);

    public byte[] toBytes() throws IOException {
        return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this);
    }

    public ManifestEntry fromBytes(byte[] bytes) throws IOException {
        return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
    }
}

ManifestEntry 的 Schema 定义

ManifestEntry 的序列化结构由其 SCHEMA 字段定义。它是一个 RowType,包含了5个字段。请注意,第5个字段 _FILE 的类型是 DataFileMeta.SCHEMA,这表明 DataFileMeta 是作为一个嵌套结构存在的。

// ... existing code ...
public class ManifestEntry implements FileEntry {

    public static final RowType SCHEMA =
            new RowType(
                    false,
                    Arrays.asList(
                            new DataField(0, "_KIND", new TinyIntType(false)),
                            new DataField(1, "_PARTITION", newBytesType(false)),
                            new DataField(2, "_BUCKET", new IntType(false)),
                            new DataField(3, "_TOTAL_BUCKETS", new IntType(false)),
                            new DataField(4, "_FILE", DataFileMeta.SCHEMA)));

// ... existing code ...

核心序列化逻辑 ManifestEntrySerializer

ManifestEntrySerializer 中的 convertTo 方法是序列化的核心。它接收一个 ManifestEntry 对象,并返回一个 InternalRow

  • kindpartitionbuckettotalBuckets 被直接或经过简单转换后设置到 GenericRow 的前4个字段。
  • file 字段(DataFileMeta 对象)entry.file() 获取到 DataFileMeta 对象,然后通过 dataFileMetaSerializer.toRow(entry.file()) 将其也序列化成一个 InternalRow。这个 InternalRow 被设置到 GenericRow 的第5个字段。

这样,DataFileMeta 就被作为嵌套行包含在了 ManifestEntry 的序列化结果中。

// ... existing code ...
    @Override
    public InternalRow convertTo(ManifestEntry entry) {
        GenericRow row = new GenericRow(5);
        row.setField(0, entry.kind().toByteValue());
        row.setField(1, serializeBinaryRow(entry.partition()));
        row.setField(2, entry.bucket());
        row.setField(3, entry.totalBuckets());
        row.setField(4, dataFileMetaSerializer.toRow(entry.file()));
        return row;
    }

    @Override
    public ManifestEntry convertFrom(int version, InternalRow row) {
// ... existing code ...
        return new ManifestEntry(
                FileKind.fromByteValue(row.getByte(0)),
                deserializeBinaryRow(row.getBinary(1)),
                row.getInt(2),
                row.getInt(3),
                dataFileMetaSerializer.fromRow(row.getRow(4, dataFileMetaSerializer.numFields())));
    }
// ... existing code ...

DataFileMeta 的 Schema

DataFileMeta 自身也是一个复杂的对象,它的序列化结构由 DataFileMeta.SCHEMA 定义,包含了文件名、大小、行数、统计信息等多个字段。


// ... existing code ...
    public static final RowType SCHEMA =
            new RowType(
                    false,
                    Arrays.asList(
                            new DataField(0, "_FILE_NAME", newStringType(false)),
                            new DataField(1, "_FILE_SIZE", new BigIntType(false)),
                            new DataField(2, "_ROW_COUNT", new BigIntType(false)),
                            new DataField(3, "_MIN_KEY", newBytesType(false)),
                            new DataField(4, "_MAX_KEY", newBytesType(false)),
                            new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA),
                            new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA),
                            new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)),
                            new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)),
                            new DataField(9, "_SCHEMA_ID", new BigIntType(false)),
                            new DataField(10, "_LEVEL", new IntType(false)),
                            new DataField(
                                    11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))),
                            new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()),
                            new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)),
                            new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)),
                            new DataField(15, "_FILE_SOURCE", new TinyIntType(true)),
                            new DataField(
                                    16,
                                    "_VALUE_STATS_COLS",
                                    DataTypes.ARRAY(DataTypes.STRING().notNull())),
                            new DataField(17, "_EXTERNAL_PATH", newStringType(true))));
// ... existing code ...

总结

ManifestEntry 的序列化是一个嵌套的过程:

  1. ManifestEntrySerializer 负责将 ManifestEntry 对象转换为一个 InternalRow
  2. 在这个过程中,对于 ManifestEntry 中的 DataFileMeta 成员变量,它会调用 DataFileMetaSerializer 将其也转换为一个 InternalRow
  3. 最终,DataFileMeta 的 InternalRow 作为 ManifestEntry 的 InternalRow 的一个字段被存储,实现了嵌套序列化。

ManifestFile 的结构

ManifestFile 本身是一个文件。它的核心作用是记录一个快照(Snapshot)中数据文件的变更信息。你可以把它理解成一个清单文件。

从类的定义来看:

public class ManifestFile extends ObjectsFile<ManifestEntry> {
//...
}
  1. 内部结构ManifestFile 继承了 ObjectsFile<ManifestEntry>,这意味着一个 ManifestFile 文件中存储了一个或多个 ManifestEntry 对象

  2. ManifestEntry: 每个 ManifestEntry 对象代表了一条对数据文件的变更记录。它主要包含以下信息:

    • kind: 变更类型,ADD(新增文件)或 DELETE(删除文件)。
    • partition: 数据文件所属的分区。
    • bucket: 数据文件所属的分桶。
    • file: 文件的元数据(DataFileMeta),包含了文件名、大小、schema ID 等详细信息。
  3. ManifestFileMeta: 当一个 ManifestFile 被写入后,会生成一个对应的 ManifestFileMeta 对象。这个对象是 ManifestFile 的摘要信息,包含了文件名、文件大小、新增文件数、删除文件数、分区统计信息等。这些元数据非常重要,可以用于在查询时进行文件过滤和裁剪,提高性能。

    // ... existing code ...
    public class ManifestFileMeta {
    
        // ...
    
        private final String fileName;
        private final long fileSize;
        private final long numAddedFiles;
        private final long numDeletedFiles;
        private final SimpleStats partitionStats;
        private final long schemaId;
        // ...
    }
    
  4. ManifestList: 多个 ManifestFileMeta 对象会被写入到另一个称为 ManifestList 的文件中。Snapshot 文件会直接指向这个 ManifestList 文件。

所以,整个结构是一个层级关系: Snapshot -> ManifestList -> ManifestFileMeta -> ManifestFile -> ManifestEntry

ManifestFileMeta

ManifestFileMeta 类代表一个清单文件(manifest file)的元数据。它的序列化和反序列化主要由 ManifestFileMetaSerializer 类来处理。

ManifestFileMeta 的实例在需要持久化时,会通过 ManifestFileMetaSerializer 被转换成一个 InternalRow 对象。这个转换的依据是 ManifestFileMeta 类中定义的静态常量 SCHEMA

  1. 序列化 Schema (ManifestFileMeta.SCHEMA)

    SCHEMA 定义了 ManifestFileMeta 序列化后的数据结构、字段顺序和类型。

    // ... existing code ...
    public static final RowType SCHEMA =
            new RowType(
                    false,
                    Arrays.asList(
                            new DataField(
                                    0, "_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE)),
                            new DataField(1, "_FILE_SIZE", new BigIntType(false)),
                            new DataField(2, "_NUM_ADDED_FILES", new BigIntType(false)),
                            new DataField(3, "_NUM_DELETED_FILES", new BigIntType(false)),
                            new DataField(4, "_PARTITION_STATS", SimpleStats.SCHEMA),
                            new DataField(5, "_SCHEMA_ID", new BigIntType(false)),
                            new DataField(6, "_MIN_BUCKET", new IntType(true)),
                            new DataField(7, "_MAX_BUCKET", new IntType(true)),
                            new DataField(8, "_MIN_LEVEL", new IntType(true)),
                            new DataField(9, "_MAX_LEVEL", new IntType(true))));
    
    private final String fileName;
    // ... existing code ...
    

    从 SCHEMA 的定义可以看出,序列化后的 InternalRow 包含10个字段,其结构如下:

字段索引 字段名 数据类型 对应 ManifestFileMeta 属性
0 _FILE_NAME VarCharType fileName
1 _FILE_SIZE BigIntType fileSize
2 _NUM_ADDED_FILES BigIntType numAddedFiles
3 _NUM_DELETED_FILES BigIntType numDeletedFiles
4 _PARTITION_STATS RowType (来自 SimpleStats.SCHEMA) partitionStats
5 _SCHEMA_ID BigIntType schemaId
6 _MIN_BUCKET IntType (可为 null) minBucket
7 _MAX_BUCKET IntType (可为 null) maxBucket
8 _MIN_LEVEL IntType (可为 null) minLevel
9 _MAX_LEVEL IntType (可为 null) maxLevel

  1. 序列化与反序列化实现 (ManifestFileMetaSerializer)

    ManifestFileMetaSerializer 继承了 VersionedObjectSerializer,这意味着序列化是带版本信息的。当前版本为 2

    • 序列化 (convertTo): 将 ManifestFileMeta 对象转换为 InternalRow。它按照 SCHEMA 定义的顺序,从 ManifestFileMeta 对象中获取属性值并填充到 GenericRow 中。
    // ... existing code ...
    @Override
    public InternalRow convertTo(ManifestFileMeta meta) {
        return GenericRow.of(
                BinaryString.fromString(meta.fileName()),
                meta.fileSize(),
                meta.numAddedFiles(),
                meta.numDeletedFiles(),
                meta.partitionStats().toRow(),
                meta.schemaId(),
                meta.minBucket(),
                meta.maxBucket(),
                meta.minLevel(),
                meta.maxLevel());
    }
    // ... existing code ...
    
    • 反序列化 (convertFrom): 将 InternalRow 转换回 ManifestFileMeta 对象。它根据 SCHEMA 定义的索引和类型,从 InternalRow 中读取数据,并调用 ManifestFileMeta 的构造函数创建新对象。
    
      
    // ... existing code ...
    @Override
    public ManifestFileMeta convertFrom(int version, InternalRow row) {
        if (version != 2) {
    // ... existing code ...
        }
    
        return new ManifestFileMeta(
                row.getString(0).toString(),
                row.getLong(1),
                row.getLong(2),
                row.getLong(3),
                SimpleStats.fromRow(row.getRow(4, 3)),
                row.getLong(5),
                row.isNullAt(6) ? null : row.getInt(6),
                row.isNullAt(7) ? null : row.getInt(7),
                row.isNullAt(8) ? null : row.getInt(8),
                row.isNullAt(9) ? null : row.getInt(9));
    }
    // ... existing code ...
    
  2. 便捷方法 (toBytes/fromBytes)

    ManifestFileMeta 类提供了 toBytes() 和 fromBytes() 方法,但它们内部使用了 ThreadLocal 缓存的 ManifestFileMetaSerializer 实例来完成实际的序列化和反序列化工作,以提高性能。

    
      
    // ... existing code ...
    // ----------------------- Serialization -----------------------------
    
    private static final ThreadLocal<ManifestFileMetaSerializer> SERIALIZER_THREAD_LOCAL =
            ThreadLocal.withInitial(ManifestFileMetaSerializer::new);
    
    public byte[] toBytes() throws IOException {
        return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this);
    }
    
    public ManifestFileMeta fromBytes(byte[] bytes) throws IOException {
        return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
    }
    

ManifestFileMeta 的序列化是一个定义良好、带版本控制的过程。其核心是将 Java 对象与一个固定 SCHEMA 的 InternalRow 进行相互转换。这个 InternalRow 随后可以被 Paimon 的文件格式(如 Avro)写入到持久化存储中。整个结构清晰地反映了清单文件元数据所需要包含的所有信息。

ManifestFileMeta 不直接包含 ManifestEntry 

这个设计的核心思想是将一个文件的 “元数据摘要” 与其 “详细内容” 分开。

  1. ManifestFileMeta 的角色:元数据摘要 ManifestFileMeta 扮演的是一个清单文件(manifest file)的“属性”或“摘要”角色。它描述了这个清单文件本身,但不包含它的具体内容。它包含的字段都是对内容的聚合统计信息,例如:

    • fileName: 清单文件的物理路径,告诉 Paimon 去哪里找到它。
    • fileSize: 文件大小。
    • numAddedFiles / numDeletedFiles: 文件中包含的“增加文件”和“删除文件”的条目数量。
    • partitionStats: 文件中所有条目涉及到的分区值的统计信息(最大/最小值等)。
    • minBucket / maxBucket: 涉及到的最小和最大 bucket ID。

    Paimon 在做查询计划或者管理快照时,可以只读取 ManifestFileMeta 的列表。通过这些摘要信息,它可以快速地进行过滤和剪枝(pruning),判断哪些清单文件(manifest file)根本不需要打开读取,从而极大地提升性能。如果把所有 ManifestEntry 都放到 ManifestFileMeta 里,那每次读取元数据都会加载所有清单文件的全部内容,会非常低效。

  2. ManifestEntry 的角色:详细内容 ManifestEntry 列表是清单文件(manifest file)内部存储的实际数据。每一个 ManifestEntry 对象都代表一个具体的数据文件(DataFileMeta)的增加(ADD)或删除(DELETE)操作。

两者如何关联?

ManifestFileMeta 通过 fileName 字段与 ManifestEntry 列表关联起来。工作流程通常是这样的:

  1. Paimon 首先读取一个 snapshot 文件,获取到它包含的 ManifestFileMeta 列表。
  2. Paimon 遍历这个 ManifestFileMeta 列表,根据查询条件和 ManifestFileMeta 中的统计信息,决定哪些清单文件需要被进一步处理。
  3. 对于需要处理的 ManifestFileMeta,Paimon 会使用它的 fileName() 和 fileSize() 属性,通过 ManifestFile 类去读取物理文件,并将文件内容反序列化成一个 List<ManifestEntry>

可以从 ManifestFile 类的代码中看到这个过程:

// ... existing code ...
    public List<ManifestEntry> read(String fileName, @Nullable Long fileSize) {
// ... existing code ...
    }
// ... existing code ...

这个 read 方法接收 fileName 作为参数,返回 List<ManifestEntry>,清晰地展示了它们之间的关系。

与 ManifestEntry 包含 DataFileMeta 的对比

 ManifestEntry 包含 DataFileMeta 是一个很好的对比。

// ... existing code ...
    private final FileKind kind;
    // for tables without partition this field should be a row with 0 columns (not null)
    private final BinaryRow partition;
    private final int bucket;
    private final int totalBuckets;
    private final DataFileMeta file;
// ... existing code ...

这里的关系是直接包含。因为一个 ManifestEntry 的核心职责就是描述对某一个数据文件的操作,所以它必须包含这个数据文件的完整元数据(DataFileMeta)。这里的关系是一对一的,逻辑上紧密耦合,因此直接作为成员变量是合理的。

总结

  • ManifestFileMeta -> ManifestEntry:是 “元数据 -> 内容” 的关系。ManifestFileMeta 是对一个物理文件的描述和摘要,这个物理文件的内容是 ManifestEntry 列表。它们通过 fileName 间接关联。
  • ManifestEntry -> DataFileMeta:是 “操作 -> 对象” 的关系。ManifestEntry 描述了一个对数据文件的操作,它必须指明操作的对象是哪个数据文件,因此直接包含 DataFileMeta

ManifestList

ManifestList 本身是一个物理文件,其内部存储的是 ManifestFileMeta 对象的列表。所以,ManifestList 的序列化结构,实际上就是将多个 ManifestFileMeta 对象序列化后,逐条写入一个文件的结构。

  1. 继承自 ObjectsFile<ManifestFileMeta>

    ManifestList 继承了 ObjectsFile<ManifestFileMeta>ObjectsFile 是一个通用的工具类,用于将一系列指定类型(这里是 ManifestFileMeta)的对象写入或读出单个物理文件。这表明 ManifestList 复用了 Paimon 中通用的对象文件读写逻辑。

    public class ManifestList extends ObjectsFile<ManifestFileMeta> {
    //...
    }
    
  2. 序列化器 (ManifestFileMetaSerializer)

    在 ManifestList.Factory 的 create() 方法中,明确指定了用于序列化/反序列化 ManifestFileMeta 对象的序列化器是 ManifestFileMetaSerializer

    // ... existing code ...
    public ManifestList create() {
        RowType metaType = VersionedObjectSerializer.versionType(ManifestFileMeta.SCHEMA);
        return new ManifestList(
                fileIO,
                new ManifestFileMetaSerializer(),
                metaType,
                fileFormat.createReaderFactory(metaType),
                fileFormat.createWriterFactory(metaType),
                compression,
                pathFactory.manifestListFactory(),
                cache);
    }
    // ... existing code ...
    

    正如我们之前讨论的,ManifestFileMetaSerializer 会将一个 ManifestFileMeta 对象转换成一个 InternalRow

  3. 文件格式 (FileFormat)

    ManifestList 文件本身也是有格式的,通常是 Avro 格式(这是 Paimon 的默认配置)。ManifestList.Factory 在创建 ManifestList 实例时,会根据传入的 fileFormat(例如 avro)来创建对应的 FormatReaderFactory 和 FormatWriterFactory

    这意味着,ManifestList 文件的物理结构是:

    • 一个 Avro 文件(或其他指定格式的文件)。
    • 这个 Avro 文件的 Schema 是由 ManifestFileMeta.SCHEMA 决定的。
    • 文件中的每一行(record)都代表一个序列化后的 ManifestFileMeta 对象。
  4. 写操作 (write 方法)

    ManifestList 的 write 方法接收一个 List<ManifestFileMeta>,然后调用父类 ObjectsFile 的 writeWithoutRolling 方法,将这个列表中的每一个 ManifestFileMeta 对象通过 ManifestFileMetaSerializer 序列化成 InternalRow,再由 FormatWriter 写入到物理文件中。

    // ... existing code ...
    public Pair<String, Long> write(List<ManifestFileMeta> metas) {
        return super.writeWithoutRolling(metas.iterator());
    }
    // ... existing code ...
    

总结

ManifestList 的序列化结构可以概括为:

  • 它是一个物理文件,通常是 Avro 格式。
  • 这个文件的 Schema(数据结构)由 ManifestFileMeta.SCHEMA 定义。
  • 文件中的每一行记录都对应一个完整的、序列化后的 ManifestFileMeta 对象

Avro 自带格式:

  1. 自带 Schema: 文件本身就包含了数据的结构信息,这使得数据解析非常可靠,并且支持 Schema 的演进(比如未来在 ManifestFileMeta 中增加或删除字段)。
  2. 强类型: 每个字段都以其定义的类型(如 long, string, struct)进行存储,避免了类型转换的模糊性和错误。
  3. 压缩效率高: 二进制格式通常比文本格式更紧凑,并且可以与各种压缩算法(如 snappy, zstd)高效结合,节省存储空间。
  4. 生态系统支持: Avro  是大数据生态中非常成熟和通用的格式,易于与其他系统集成。

所以,当读取一个 ManifestList 文件时,会得到一个 List<ManifestFileMeta>,列表中的每个元素都是从文件的一行记录中反序列化出来的。

这个结构与 ManifestFile 非常相似,ManifestFile 的内容是 ManifestEntry 列表,而 ManifestList 的内容是 ManifestFileMeta 列表。这体现了 Paimon 文件组织中分层的元数据管理思想。

Snapshot

与 ManifestList 和 ManifestFile 不同,Snapshot 对象不是被序列化成 Avro 或其他二进制行式文件。Snapshot 对象是序列化成一个 JSON 文件

  1. JSON 格式 Snapshot 类使用了 Jackson 库(通过 org.apache.paimon.shade.jackson2 进行了 shaded,以避免依赖冲突)来进行 JSON 的序列化和反序列化。你可以从类和字段上的注解看出来:

    • @JsonIgnoreProperties(ignoreUnknown = true): 在反序列化时,如果 JSON 中有多余的字段,会忽略它们,这增强了向后兼容性。
    • @JsonProperty("..."): 将 Java 类的字段名与 JSON 文件中的键(key)进行映射。
    • @JsonCreator: 标记用于反序列化的构造函数。
    • @JsonGetter: 标记用于序列化的 getter 方法。
    • @JsonInclude(JsonInclude.Include.NON_NULL): 序列化时,如果字段值为 null,则不包含这个键值对,可以减小 JSON 文件的大小。
    // ... existing code ...
    @Public
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class Snapshot implements Serializable {
    
        // ... existing code ...
    
        protected static final String FIELD_VERSION = "version";
        // ... 其他字段定义 ...
    
        // ... existing code ...
        @JsonProperty(FIELD_VERSION)
        @Nullable
        protected final Integer version;
    
        @JsonProperty(FIELD_ID)
        protected final long id;
    
        // ... 其他属性和注解 ...
    
        @JsonCreator
        public Snapshot(
                @JsonProperty(FIELD_VERSION) @Nullable Integer version,
                @JsonProperty(FIELD_ID) long id,
                // ... 其他构造函数参数 ...
        ) {
            // ... 构造函数实现 ...
        }
    
        public String toJson() {
            return JsonSerdeUtil.toJson(this);
        }
    
        public static Snapshot fromJson(String json) {
            return JsonSerdeUtil.fromJson(json, Snapshot.class);
        }
    
        public static Snapshot fromPath(FileIO fileIO, Path path) {
            // ... 从文件路径读取并反序列化 ...
        }
    }
    
  2. 序列化内容 一个 Snapshot JSON 文件包含了一个特定时间点(即一次 commit)的所有元数据信息。这些信息本身不是数据,而是指向其他元数据文件的指针以及一些统计信息。主要字段包括:

    • id: 快照的唯一ID。
    • schemaId: 此快照对应的表结构(Schema)的ID。
    • baseManifestList: 指向一个 ManifestList 文件的文件名。这个 manifest list 记录了从上一个快照继承来的所有数据文件。
    • deltaManifestList: 指向另一个 ManifestList 文件的文件名。这个 manifest list 只记录了本次提交中新增的数据文件。
    • changelogManifestList: 指向一个可选的 ManifestList 文件名,用于记录变更日志(Changelog)。
    • commitKind: 提交类型,如 APPENDCOMPACT 等。
    • timeMillis: 提交发生的时间戳。
    • 以及其他统计数据,如 totalRecordCountdeltaRecordCount 等。
  3. 物理存储 每个 Snapshot 对象序列化后,会以 snapshot-<id> 的形式存储在表的 snapshot 目录下。例如,ID 为 3 的快照会存为 snapshot-3 文件。

Snapshot 的序列化机制可以概括为:

  • 格式: 使用 JSON 格式。
  • 实现: 依赖 Jackson 库进行对象和 JSON 字符串之间的转换。
  • 内容: 存储的是指向 ManifestList 文件的文件名(指针) 和其他元数据信息,而不是 ManifestList 的实际内容。
  • 角色Snapshot 文件是 Paimon 数据湖的入口点。通过读取一个 snapshot-id 文件,Paimon 就能顺着 baseManifestList 和 deltaManifestList 的指引,层层找到最终的数据文件。

这种设计将轻量级的、易于人类阅读的 JSON 用于顶层元数据(Snapshot),而将高效的、结构化的二进制格式(如 Avro)用于存储更大量的底层元数据列表(ManifestListManifestFile),实现了性能和可读性之间的平衡。

Changelog

Changelog 作为 Snapshot 的子类,在序列化机制上和父类几乎完全相同,但它在概念和用途上有所区别。

序列化机制:完全继承,没有不同。Changelog 类没有定义任何自己独有的新字段。它继承了 Snapshot 的所有字段,并且在构造时,无论是从一个 Snapshot 对象创建,还是从 JSON 反序列化,都是对这些继承来的字段进行赋值。

从代码实现上看,Changelog 的序列化方式与 Snapshot 是一致的:

  1. 继承关系public class Changelog extends Snapshot 这行代码明确了 Changelog 继承了 Snapshot 的所有字段和方法。
  2. JSON 格式: 因为父类 Snapshot 使用 Jackson 进行 JSON 序列化,子类 Changelog 自然也遵循同样的机制。它会把所有继承来的字段(如 idschemaIdbaseManifestList 等)序列化成一个 JSON 字符串。
  3. 注解Changelog 类本身也使用了 @JsonIgnoreProperties(ignoreUnknown = true) 注解,并且它的构造函数使用了 @JsonCreator 和 @JsonProperty,这都和 Snapshot 的模式完全一样。
  4. 序列化/反序列化方法Changelog 提供了自己的 fromJson 和 fromPath 静态方法,但其内部实现依然是调用 JsonSerdeUtil 工具类,这和 Snapshot 的做法也是一致的。
// ... existing code ...
    public static Changelog fromJson(String json) {
        return JsonSerdeUtil.fromJson(json, Changelog.class);
    }

    public static Changelog fromPath(FileIO fileIO, Path path) {
// ... existing code ...
    }
// ... existing code ...

结论就是:在序列化技术和最终生成的 JSON 结构上,Changelog 和 Snapshot 没有区别。一个 Changelog 对象的 JSON 产物和一个 Snapshot 对象的 JSON 产物,如果字段值都相同,那么它们的格式和内容也是完全一样的。

为什么需要一个子类?

既然序列化没有区别,为什么还要创建一个 Changelog 子类呢?这主要是出于以下几个原因:

  1. 语义和类型安全:

    • 虽然它们的结构相同,但它们代表的概念不同。Snapshot 代表一个完整的表状态,而 Changelog 代表一次增量变更
    • 通过创建不同的类,可以在代码中实现类型安全。一个方法的参数如果是 Changelog,你就不能错误地传入一个 Snapshot 对象(尽管可以强制转换,但编译器会提示)。这使得代码的意图更清晰,也更健壮。例如,在处理 Changelog 相关的逻辑时,方法签名会明确要求 Changelog 类型,避免了混淆。
  2. 生命周期和管理:

    • Snapshot 文件会随着表的写入不断生成,并且会根据过期策略(snapshot.expire.num-retained-min 等)被清理掉。
    • Changelog 文件是为了让变更数据能够独立于 Snapshot 的生命周期而存在。当一个 Snapshot 因为过期被删除时,它对应的变更信息可以被保存为一个 Changelog 文件(物理文件名为 changelog-<id>),从而让下游的流式作业可以从更早的时间点开始消费变更数据。
    • ChangelogManager 就是专门用来管理这些 changelog- 文件的生命周期的。
  3. 未来的扩展性:

    • 如果未来 Changelog 需要增加一些 Snapshot 所没有的特殊属性,直接在 Changelog 子类中添加会非常方便,而不会影响到 Snapshot 类。

为什么序列化一样

Changelog 的核心目的是为了支持增量流式消费。

那为什么它和 Snapshot 的结构(Class 定义)完全一样呢?

答案是:真正的“增量”信息,并不体现在 Changelog 这个元数据文件的结构里,而是体现在它所指向的底层数据文件(Data File)的内容里。

可以把 Snapshot 或 Changelog 的 JSON 文件想象成一个 “货运清单” 。

  • 清单格式(元数据结构):无论是运送整个仓库的货物(全量),还是只运送今天新到和发走的货物(增量),货运清单本身的格式都可以是完全一样的。它都需要有清单ID、日期、发货人、以及一个“货物列表”字段。Snapshot 和 Changelog 类就是这个清单的“格式定义”,为了代码复用,它们被设计成了一样的。

  • 清单内容(指向的数据):区别在于清单上“货物列表”里记的东西。

    • 一个普通 Snapshot 的清单,它指向的数据文件里只记录了数据的“最终态”。比如一条记录从 A 更新到 B,数据文件里可能就只有一条 +U (B) 的记录。下游消费者只知道它现在是 B,但不知道它之前是 A。
    • 一个为增量消费设计的 Changelog,它指向的数据文件里会记录完整的“变更历史”。对于同样的更新操作,数据文件里会包含 -U (A) 和 +U (B) 两条记录。这样下游消费者就能精确地知道发生了什么变化。

这个“数据文件里到底记录什么内容”的行为,是由表的 changelog-producer 属性控制的。

  • changelog-producer = 'none'` (默认)

    • 数据文件只包含最新值。
    • Snapshot 文件中的 changelogManifestList 字段通常是空的。
    • 这种模式下,流读只能看到合并后的结果,无法获取精确的 before 值,不适合需要完整 changelog 的场景(比如计算 SUM)。
  • changelog-producer = 'input' / 'lookup' / 'full-compaction'`

    • Paimon 会在数据文件中额外生成并保留完整的 changelog 数据(-U+U 等)。
    • Snapshot 文件中的 changelogManifestList 字段会指向一个专门记录这些 changelog 数据文件的 ManifestList
    • 这样,即使 Snapshot 本身的结构没变,但它通过 changelogManifestList 指向了包含丰富变更信息的数据文件,从而实现了真正的增量流读能力。

既然开启 changelog-producer 后,Snapshot 文件自身就能指向增量数据,为什么还需要 Changelog 这个子类呢?

这就是为了生命周期管理

Snapshot 文件会根据过期策略被定期删除。但我们往往希望增量日志的保留时间比快照更长,以便下游任务可以从更早的时间点回溯消费。

因此,当一个带有宝贵增量信息(由 changelog-producer 生成)的 Snapshot 即将过期时,Paimon 会:

  1. 读取这个 Snapshot 对象。
  2. 用它创建一个一模一样的 Changelog 对象(new Changelog(snapshot))。
  3. 将这个 Changelog 对象序列化成一个 changelog-<id> 文件,存入 changelog/ 目录。
  4. 这样,即使原始的 snapshot-<id> 文件被删除了,它的所有元数据和指向增量数据文件的能力,都以 Changelog 的形式被永久或更久地保留了下来。

总结

Changelog 在序列化层面与 Snapshot 没有区别,它完全复用了父类的 JSON 序列化机制。

  • 结构相同是为了代码复用Changelog 和 Snapshot 都是元数据容器,它们的结构(字段)是一样的。
  • 内容不同是实现增量的关键:是否能增量消费,取决于它们指向的数据文件中是否包含完整的变更记录(-U/+U),这由 changelog-producer 配置决定。
  • Changelog 类是语义和生命周期的区分:它在代码层面提供了清晰的类型区分,并且它的核心使命是将 Snapshot 的增量元数据从其自身的过期策略中“解救”出来,实现更长的生命周期。

可以把它想象成一种“身份转换”或者“贴标签”。一个 Snapshot 对象在即将被系统根据过期策略清理时,可以“变身”成一个 Changelog 对象被持久化下来。它们携带的信息(字段)完全一样,但“身份”(Java 类型)和“归宿”(存储路径以 changelog- 开头,并由 ChangelogManager 管理)不同了。

设立 Changelog 这个子类的主要目的是为了在代码层面提供更清晰的语义区分和类型安全,并对 Snapshot 和 Changelog 这两种不同生命周期的元数据文件进行分开管理,同时也为未来的功能扩展提供了便利。

Tag

Tag 在概念和实现上都与 SnapshotChangelog 有着显著的区别。

Tag 的核心含义是一个 “命名的快照”“标签” 。可以把它理解为 Git 中的 tag。它的主要作用是:

  • 人类可读的引用:为某个特定的 Snapshot ID(通常是一个长整型数字)赋予一个有意义的、人类可读的名称,比如 release-v1.0 或 daily-backup-20231026
  • 防止过期:被打了 Tag 的 Snapshot 以及它所依赖的所有数据文件,默认情况下不会被 Paimon 的自动过期清理机制删除。这为数据版本回溯和归档提供了一个可靠的保障。

Tag 的序列化和存储机制与 Snapshot 和 Changelog 完全不同,这主要体现在存储的内容和方式上。

存储内容:轻量级指针

  • Snapshot/Changelog:它们的文件(如 snapshot-3)存储的是一个完整的元数据对象,包含了指向 ManifestList 的文件名、提交类型、记录数统计等所有快照信息。
  • Tag:一个 Tag 文件(如 tag/release-v1.0)存储的不是一个完整的 Snapshot 对象,而是一个非常轻量级的指针对象。这个对象只包含最核心的指向信息,主要是:
    • snapshotId: 这个 Tag 指向的快照 ID。
    • schemaId: 对应快照的 Schema ID。
    • timeMillis: 对应快照的提交时间。
    • options: 对应快照的表配置。

这种设计非常高效,创建一个 Tag 几乎没有存储开销,因为它只是创建了一个包含快照 ID 的小文件,而不是复制一份完整的快照元数据。

和 SnapshotChangelog 一样,Tag 的指针对象也是被序列化成 JSON 格式。

存储方式

  • 存储目录:Tag 文件存储在表路径下的 tag/ 目录中,与 snapshot/ 和 changelog/ 目录并列。
  • 文件命名:Tag 文件的文件名就是 Tag 的名称。例如,你创建一个名为 v2.0 的 Tag,Paimon 就会在 tag/ 目录下创建一个名为 v2.0 的文件。
warehouse
└── default.db
    └── my_table
        ├── snapshot/
        │   ├── snapshot-1
        │   └── snapshot-2
        ├── changelog/
        └── tag/
            ├── release-v1.0  # 文件内容: {"snapshotId":1, "schemaId":0, ...}
            └── v2.0          # 文件内容: {"snapshotId":2, "schemaId":0, ...}

总结对比

特性 Snapshot Changelog Tag
​核心作用​ 代表某个时间点的完整表状态 代表一次增量变更,生命周期可独立于快照 为某个快照创建永久性的、可读的别名
​存储目录​ snapshot/ changelog/ tag/
​文件命名​ snapshot-<ID> changelog-<ID> <tag-name>
​序列化格式​ JSON JSON JSON
​存储内容​ 完整的快照元数据对象 完整的快照元数据对象 轻量级的指针对象 (主要包含 snapshotId)
​生命周期​ 会被自动过期策略清理 从过期的快照转换而来,有独立的生命周期 默认永久存在,保护其指向的快照不被清理

总而言之,Tag 在 Paimon 中是一个非常重要的功能,它通过一个轻量级的、序列化为 JSON 的指针文件,实现了对特定数据版本的永久性、可读性引用,这在生产环境中对于数据治理、版本控制和故障恢复至关重要。

单个 DataFileMeta 对象内存分析

一个 DataFileMeta 对象本身占用的内存大小是可变的,主要取决于表的结构和配置。我们可以分析它的成员变量来估算其大小。

以下是 DataFileMeta 类的主要字段和它们的大致内存占用分析(基于 64 位 JVM):

  • 固定大小字段:

    • fileSizerowCountminSequenceNumbermaxSequenceNumberschemaId: 5个 long 类型,共 5 * 8 = 40 字节。
    • level: 1个 int 类型,4 字节。
    • creationTime: 1个 Timestamp 对象,内部包含 long 和 int,约 12 字节 + 对象开销。
    • deleteRowCount: 1个 Long 包装对象,约 8 字节 + 对象开销。
    • fileSource: 1个 FileSource 枚举引用。
  • 可变大小字段(主要内存消耗来源):

    • fileName (String): 文件名字符串。Paimon 的文件名通常包含 UUID,长度较长,例如 data-b3d3b3f0-d8a8-4195-a617-1f4973434b72-0.orc。一个文件名可能占用 100-200 字节。
    • minKeymaxKey (BinaryRow): 存储了文件的最小和最大主键。其大小取决于主键的列数和类型。如果主键很复杂,这里会占用较多空间,可能在几十到几百字节。
    • keyStatsvalueStats (SimpleStats): 存储了主键列和值列的统计信息(最大值、最小值、null 计数等)。这是内存占用的主要部分,特别是 valueStats。其大小与表中列的数量和启用的统计信息级别 (stats.mode) 直接相关。如果为一个宽表(列非常多)的所有列都收集统计信息,这部分会占用数千字节。
    • extraFiles (List<String>): 存储关联的额外文件列表(例如索引文件)。如果存在,每个文件名都会增加内存占用。
    • embeddedIndex (byte[]): 内嵌的索引数据,例如布隆过滤器。其大小取决于索引类型和数据量,可能从几十字节到几千字节不等,但通常是可选的。
    • valueStatsCols (List<String>): 记录了哪些列收集了 valueStats。如果收集统计信息的列很多,这个字符串列表也会很大。
    • externalPath (String): 外部存储路径。如果使用,它会存储一个完整的路径字符串。

小结:

一个单独的 DataFileMeta 对象,对于一个简单的表,其内存占用可能在几百字节;但对于一个列很多(宽表)且开启了详细统计信息的表,其内存占用可能会达到 几 KB 甚至更高

内存占用会很多吗?

在特定场景下会非常多。

问题不在于单个 DataFileMeta 对象有多大,而在于 查询规划(Planning)时需要在内存中加载多少个这样的对象

当 Flink 或 Spark 的 Driver/JobManager 节点为 Paimon 表生成执行计划时,它需要读取表的 manifest 文件,这些文件里就存储了所有数据文件(DataFile)的元数据,也就是 DataFileMeta 列表。

想象一个场景: 一个大型分区表,经过了长时间的流式写入,并且没有进行充分的合并(Compaction)。这可能导致每个分区、每个 bucket 下都积累了成千上万个小的碎文件。

如果一个查询需要扫描多个分区,那么 Driver 节点就可能需要加载 数万甚至数百万个 DataFileMeta 对象到内存中。

举例计算: 假设一个表有 1,000,000 个数据文件,每个 DataFileMeta 平均占用 3 KB。 总内存占用 = 1,000,000 * 3 KB = 3 GB

Paimon 的应对策略

Paimon 自身设计了多种机制来缓解这个问题:

  1. 分区裁剪 (Partition Pruning): 根据 WHERE 条件过滤分区,只读取相关分区的 manifest 文件,从源头上减少了需要处理的元数据量。
  2. 元数据过滤 (Metadata Filtering): Paimon 在 manifest 文件本身也存储了统计信息(如 ManifestFileMeta 中的 partitionStats)。在读取和反序列化完整的 DataFileMeta 列表之前,可以利用这些更高层级的统计信息来跳过整个 manifest 文件或者其中的部分文件,避免了不必要的开销。
  3. 文件合并 (Compaction): 这是最根本的解决方案。通过定期运行 Compaction 作业,可以将大量小文件合并成少量大文件。文件数量的减少直接导致了 DataFileMeta 数量的减少,从而显著降低了查询规划时的内存压力。

结论

DataFileMeta 的内存占用对于单个对象来说是可控的,但当数据文件数量巨大时,其在 Driver/JobManager 端的总内存消耗会成为一个严重的性能瓶颈。

因此,对于生产环境中的 Paimon 表,定期进行文件合并(Compaction)是至关重要的,它能有效控制文件数量,从而保证查询规划的稳定性和效率。

Logo

葡萄城是专业的软件开发技术和低代码平台提供商,聚焦软件开发技术,以“赋能开发者”为使命,致力于通过表格控件、低代码和BI等各类软件开发工具和服务

更多推荐