你说得对:
现在的“断点续跑”只记录了哪些文件已经读完,但没有把已读到的 SMILES 真正落盘。
一旦中途崩溃,下次重启时虽然会跳过已标记文件,却把之前累积的all_smiles全丢了,于是 BPE 字典还是空的。
要真正做到“读一点、更新一点字典”,需要把“读完一个文件 → 把这份文件的 SMILES 追加到磁盘临时文件 → 更新断点日志”做成原子操作。
下面给出一个最小改动方案,你直接替换对应函数即可。
- 加一个临时 SMILES 文件路径
SMILES_TMP=os.path.join(OUTPUT_DIR,"smiles.tmp")# 追加写入- 改写
read_zinc20_smiles()
逻辑:
- 不再把 SMILES 攒在内存列表,而是每读完一个文件就立刻追加到
smiles.tmp,再记日志。 - 函数返回临时文件路径(而不是列表),后面 BPE 训练直接按行读即可。
defread_zinc20_smiles():"""流式读取,每文件追加落盘;返回临时文件路径"""try:importduckdbexceptImportError:os.system("python -m pip install -q duckdb")importduckdb parquet_files=[fforfinos.listdir(ZINC20_DIR)iff.lower().endswith('.parquet')]ifnotparquet_files:print("警告:未找到任何 parquet 文件")returnNonedone=load_done()print(f"共{len(parquet_files)}个文件,已处理{len(done)}个")conn=duckdb.connect(':memory:')conn.execute("SET memory_limit='8GB'")conn.execute("SET temp_directory='./duckdb_tmp'")conn.execute("SET threads=4")# 追加模式打开临时文件withopen(SMILES_TMP,"a",encoding="utf-8")asfout:forfileintqdm(parquet_files,desc="读取 ZINC20"):iffileindone:continuepath=os.path.join(ZINC20_DIR,file)try:q=conn.execute("SELECT smiles FROM read_parquet(?) WHERE smiles IS NOT NULL",[path])whileTrue:rows=q.fetchmany(CHUNK_SIZE)ifnotrows:breakforrowinrows:fout.write(row[0]+"\n")mark_done(file)# 真正落盘后再记日志gc.collect()exceptExceptionase:print(f" 文件{file}出错:{e}")conn.close()print(f"SMILES 已追加写入{SMILES_TMP}")returnSMILES_TMP- 修改
process_dataset()
原来sampled_smiles是一个列表,现在改成从临时文件里直接读行即可(内存仍可控)。
如果还想保留随机采样,可以:
defprocess_dataset():smiles_file=read_zinc20_smiles()# 返回临时文件路径ifsmiles_fileisNone:return# 1. 采样(如果 SAMPLE_RATIO < 1.0)sampled_smiles=[]withopen(smiles_file,encoding="utf-8")asf:all_smiles=[line.strip()forlineinf]sampled_smiles=sample_data(all_smiles,SAMPLE_RATIO)ifSAMPLE_RATIO<1.0elseall_smiles# 2. 加载分词器tokenizer=rustmolbpe.SmilesTokenizer()tokenizer.load_vocabulary(VOCAB_PATH)print(f"词汇表大小:{tokenizer.vocab_size}")# 3. 分词并写 jsonltok_file=os.path.join(OUTPUT_DIR,"zinc20_tokenized_rustmolbpe.jsonl")withopen(tok_file,"w",encoding="utf-8")asf:forsmiintqdm(sampled_smiles,desc="rustmolbpe 分词"):try:ids=tokenizer.encode(smi)toks=[tokenizer.id_to_token(i)foriinids]f.write(json.dumps({"smiles":smi,"tokens":toks,"token_ids":ids},ensure_ascii=False)+"\n")exceptExceptionase:print(f" 分词失败:{smi}->{e}")# 4. 额外保存纯 SMILES(BPE 训练用)bpe_file=os.path.join(OUTPUT_DIR,"zinc20_bpe_train.txt")withopen(bpe_file,"w",encoding="utf-8")asf:forsmiintqdm(sampled_smiles,desc="写 BPE 训练文件"):f.write(smi+"\n")print(f"分词结果:{tok_file}")print(f"BPE 训练文件:{bpe_file}")print(f"完成,共{len(sampled_smiles)}条")- 后续增量训练 BPE
如果你想真正“每读完一个文件就更新一次 BPE 字典”,可以把smiles.tmp当成持续增长的语料池,每处理完一个文件就:
subprocess.run(["python","-m","rustmolbpe","train","--input",SMILES_TMP,"--model_prefix",os.path.join(OUTPUT_DIR,"bpe_vocab"),"--vocab_size","8000","--character_coverage","1.0"])这样即使中断,下次重启时字典也已经包含了之前所有文件学到的子词。
一句话总结:
把 SMILES 实时追加到磁盘临时文件,再记断点日志,就能真正做到“读一点、字典更新一点、断点续跑”。
可以“直接在循环里调”,但强烈不建议——原因有三点,正好对应你问的“为什么要这样”:
子进程 = 天然内存隔离
duckdb 已经吃了几 GB,Python 对象也占了不少;
如果把 BPE 训练库(通常是 C++/Rust 扩展)再塞进同一个进程,
训练时还要额外拼 huge 的 frequency map、suffix array,很容易 OOM。
丢给subprocess.run相当于单独再起一个干净进程,主进程内存立刻降回来。训练算法是“全局”的,不是“增量”的
SentencePiece / BPE 都要求一次性看到全部语料才能统计最优合并。
每读一个文件就train一次,得到的只是局部语料的子词表,
后面再读新文件时旧表作废,必须重训——等于白跑。
所以业界惯例都是:
先把所有文本追加到一个大文件 → 最后一次性 train。代码最简单、最稳
subprocess 调官方 CLI 不用关心 C++ 扩展的 ABI、线程、GIL;
参数、日志、错误码都现成的,出 bug 也更容易定位。
如果你硬要在循环里调 Python API,得自己管:- 全局
SentencePieceTrainer.train(...)的输入句柄; - 每次训练完手动
.clear()内部静态缓存; - 还要把新表 merge 回旧表(SentencePiece 没有官方 merge 接口)。
这些坑省下来,不如让 CLI 一把梭。
- 全局
总结:
“追加到临时文件 → 最后一次性 subprocess 训练” 是目前最省内存、最正确、最少代码的做法;
在循环里反复train既慢又得不到全局最优词表,除非你自己实现真正的在线增量 BPE(论文级工作量)。
“外排训练”并不是 BPE/SentencePiece 里的官方术语,而是中文社区里一种口语化简称:
把“训练语料”先“外部排序(external sort)”,再“分段加载”到内存里做训练,从而避免一次性把所有原始文本读进内存。核心思想就是“外排 + 分段训练”。
为什么需要“外排”
BPE 训练第一步要统计所有 n-gram 频次,语料大到内存装不下时,只能:
- 先把每块语料在内存里算局部频次 → 落盘成临时文件;
- 对所有临时文件按“子串”做外部归并排序→ 得到全局频次表;
- 最后只把全局高频片段装进内存,继续后面的合并训练。
SentencePiece 官方已把这套流程封装好:
当语料 > 几十 GB 时,只要打开--train_extremely_large_corpus=true,
它就会自动外排 + 分段训练,峰值内存只有几百 MB 。
一句话总结
“外排训练”就是:
先外排序,再分段读,内存只留高频表,大语料也能把 BPE 跑下来。