当我们要对100亿的数据排序,内存无法一次性装下数据时,怎么办?(附代码)
估计不少人会被问到这个问题,这个其实是外排序问题,不同于内排序全程都可以在内存中进行,外排序一般数据量巨大,当然也是有解决方案的,解决方案不外乎几种:
- 分块读取,然后插入数据库中,让数据库帮我们排序,我们分块查询数据库,然后分块读取保存进文件(这种方法简单直接,技术难度没那么大,但是对数据库服务器要求比较高)
- 分治法,将原始大文件切割成小文件,使得小文件内的数据能够一次性加载进内存,小文件内的数据先排序,然后每次从这些小文件中读取若干数据出来,使用归并排序合并成大的文件。
- 如果这些数据的范围都不大,则有很多数据是重复的,我们使用计数排序即可。
不过方法三有较大的局限性,如果某个数字重复非常多,那么有可能在计数过程就溢出了(当然你可能还会说:就算是有100亿个重复的,C语言中使用long long
类型来保存计数那就不存在溢出了,可是如果不止100亿呢,而且每个数字范围还很大,超过数组的下标范围呢,例如C语言中数组的下标是int类型的)。
下面演示一下方法二的整个实操过程。
问题描述与分析
我们先正式描述一下我们的问题:
假设我们有100亿左右的数据,每个数字范围是8字节表示的有符号数,对该数据升序排序,使用C语言完成,你只能使用4GB的内存。
分析:数字范围是8字节表示,则计数排序无法完成,因为C语言数组的下标必须是一个int类型的,法二无法使用;100 0000 0000 * 8 = 800 0000 0000 B
,约为74GB,远大于能够使用的内存,但是不影响,我们把文件切割小一点即可。
整个过程主要分成三个阶段:
- 将原始文件切割成若干段,每段占用内存
m
字节(当然最后一段可能不足m
字节)要求每段能够一次性读入内存,这些文件一共n
个,先把小文件内的数组排序。 - 一次性对所有
n
个文件读取l
字节的数据,要求(n+1)*l
不能超过内存上限,对这n
段进行归并排序(n路归并排序),因为归并排序的结果也要耗内存,因此我们要求(n+1)*l
不能超过内存上限,而不是n*l
,归并结果保存到文件中 - 从之前的位置开始,继续读取这
n
个文件的下一段l
字节的数据,重复上述过程,将归并排序追加到末尾,直至所有的数据都读取完毕
环境准备
我们首先需要准备100亿的数据,为了表示一般性,我们可以准备多于100亿的数据,数据就以二进制方式存放,每8个字节作为一个元素,元素之间也不需要什么间隔符了。
C语言中int类型最大的正数是0x7ffffff
,约为1亿多,如果我们定义这么大的缓冲区,然后循环100次,即可生成100多亿的数据,使用C语言即可完成数据的准备:
// generate.c
#include<stdio.h>
#include<stdlib.h>
#include<time.h>
#define MAX_LEN 0x7ffffff
#define PRIME 19
#define LL long long
LL nums[MAX_LEN];
int main(){
srand((unsigned)time(NULL));
// 覆盖式写入MAX_LEN个数字,以二进制方式
FILE *fp = fopen("./mount_data/data.data", "wb");
LL num1, num2;
int postCnt = 0;
if (fp != NULL){
printf("open\n");
for(int st = 1; st <= 100; st++){
postCnt = 0;
for(int i = 0; i < MAX_LEN;i++){
// 产生一个随机数字 范围比 [-0x7ffffff, 0x7ffffff] 还大
num1 = (LL)(rand()) * 100;
num2 = (LL)(rand()) * 100;
nums[i] = num1 - num2;
if(nums[i] > 0){
postCnt++;
}
}
printf("st = %d, postCnt = %d\n", st, postCnt);
fwrite(nums, sizeof(LL), MAX_LEN, fp);
}
// 再写入几个数字,制造切割不均匀现象
printf("add some\n");
for(int i = 0; i < PRIME; i++){
nums[i] = (LL)(rand()) * (LL)(0x7ffffff);
}
fwrite(nums, sizeof(LL), PRIME, fp);
printf("finish!\n");
fclose(fp);
} else{
printf("can't open file\n");
}
return 0;
}
从控制台中,我们可以看到正数负数几乎是对半的:
open
st = 1, postCnt = 67107155
st = 2, postCnt = 67119290
st = 3, postCnt = 67108807
st = 4, postCnt = 67105100
st = 5, postCnt = 67118299
st = 6, postCnt = 67106095
st = 7, postCnt = 67105083
st = 8, postCnt = 67107339
st = 9, postCnt = 67100382
...
我们来看一下数据大小:
du -sh mount_data/data.data
100G mount_data/data.data
排序实施
切分数据并排序小文件
我们能用4GB的内存,但是我们不能切分文件为4GB,这是因为程序运行过程也要内存,因此我们可以先试着把数据切分成2GB的大小,2GB的数据可以一次性加载了,2GB的大小能够存放的long long 类型数据个数:
>>> 2 * 1024 * 1024 * 1024 / 8
268435456.0
但是已经超过int
表示范围了,为了方便我们使用数组进行后续排序,我们把单个文件限制成1GB的,这个其实影响不大,读取磁盘次数翻倍了而已。
>>> 1 * 1024 * 1024 * 1024 / 8
134217728.0
刚好是4字节int类型最大能表示的范围。
因此我们需要每次最多读取上述个数的数字出来排序,然后这些数据作为一个数据块,写入到文件中,排序我们使用的是快速排序。
// split_sort.c
#include<stdio.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<time.h>
#define MAX_LEN 0x7ffffff
#define LL long long
LL nums[MAX_LEN];
void qSort(int l, int r){
if (l >= r) return;
int i = l, j = i + 1;
LL key = nums[l], tmp;
for (; j <= r; j++) {
if (nums[j] < key) {
i++;
// swap(i, j);
tmp = nums[i];
nums[i] = nums[j];
nums[j] = tmp;
}
}
// swap(l, i);
tmp = nums[i];
nums[i] = nums[l];
nums[l] = tmp;
qSort(l, i - 1);
qSort(i + 1, r);
}
int main(){
char sizeStr[20];
char currentFile[40];
// 只读方式打开二进制文件
FILE *fpOrigin = fopen("./mount_data/data.data", "rb");
// 写方式打开文本文件
FILE *fpSize = fopen("./mount_data/size.txt", "w");
// 用于标识第几个文件
int cnt = 0;
size_t size;
if(fpOrigin == NULL){
printf("can't open file ./mount_data/data.data \n");
return 0;
}
if(fpSize == NULL){
printf("can't open file ./mount_data/size.txt \n");
return 0;
}
printf("open\n");
while((size = fread(nums, sizeof(LL), MAX_LEN, fpOrigin)) > 0){
// 记录下当前文件中包含的元素个数
sprintf(sizeStr, "%lud\n", size);
fwrite(sizeStr,strlen(sizeStr), sizeof(char), fpSize);
// 使用快速排序对当前小文件的数据排序
qSort(0, size);
sprintf(currentFile, "./mount_data/split/%d.data", cnt++);
// 写方式打开二进制文件
FILE *currentFp = fopen(currentFile, "wb");
if(currentFp == NULL){
printf("can't open file %s \n", currentFile);
continue;
}
// 写入实际读取到的数据个数
fwrite(nums, sizeof(LL), size, currentFp);
fclose(currentFp);
printf("finish split %d \n", cnt - 1);
}
printf("finish split all \n");
fclose(fpOrigin);
fclose(fpSize);
return 0;
}
我们可以编写一个测试的文件看看我们的小文件是否已经排序完成:
// check_sort.c
#include<stdio.h>
#define FILE_CNT 101
#define MIN(a,b) ((a) < (b) ? (a) : (b))
#define MAX_LEN 0x7ffffff
#define LL long long
LL nums[MAX_LEN];
int main(){
char currentFile[40];
size_t size;
int ok = 1;
for(int i = 0; i < FILE_CNT;i++){
sprintf(currentFile, "./mount_data/split/%d.data", i);
FILE *currentFp = fopen(currentFile, "rb");
if(currentFp != NULL){
ok = 1;
// 检查每个文件的数据
size = fread(nums, sizeof(LL), MAX_LEN, currentFp);
for(int i = 1; i < size;i++){
if(nums[i] < nums[i - 1]){
ok = 0;
}
}
printf("%s : %d\n", currentFile, ok);
fclose(currentFp);
} else{
printf("%s open fail!\n", currentFile);
}
}
printf("check finish\n");
return 0;
}
当所有的输出都是1时,即代表这些小文件内是有序的。
归并排序,合并成最终数据
我们总共有101个文件,根据之前的分析,我们要满足:
(101 + 1) * l <= 2GB
则需要满足l<=21053761B
,这些内存能存放8字节的数据的个数是21053761 / 8 = 2631720
。
n路归并排序这一部分的大体逻辑是:
维护n
小文件的个缓冲区(这里n=101
),每个缓冲区都有一个读取指针,由于小文件已经有序,因此缓冲区中的内容也必然有序,每次从这些候选缓冲区中找出最小的值,被选中的缓冲区指针往后移,这便是归并排序,无需过多说明。
选取过程我们可以用优先队列,但是c语言手写优先队列比较繁琐,也不是本文的目的,使用优先队列带来的提升只是由原先的$O(101)$变为$O(\log 101)\approx 7$,并非是数量级别上的提升,因此就使用循环的方式。
每当有缓冲区读取完毕时,我们都需要从对应的文件中继续读取数据到缓冲区中,且每当存放最终结果的缓冲区被写满后我们也要写入文件中。
// merge_sort.c
#include<stdio.h>
#define FILE_CNT 101
#define LL long long
#define MAX_HANDLE_CNT 2631720
typedef struct {
// 存放小文件的文件指针
FILE* splitFileFp;
// 缓冲区,存放读取出来的一段数据
LL nums[MAX_HANDLE_CNT];
// 当前缓冲区大小
int bufSize;
// 缓冲区已经读取的数据个数(读取指针)
int readCnt;
// 是否已经读取完所有数据
int readFinish;
}FileHelper;
FileHelper fileHelpers[FILE_CNT];
typedef struct {
// 存放最终文件的文件指针
FILE* fileFp;
// 缓冲区,暂时存放写入的一段数据
LL nums[MAX_HANDLE_CNT];
// 缓冲区已经写入的数据个数(写入指针)
int writeCnt;
}ResultHelper;
ResultHelper resultHelper;
int main() {
char currentFile[40];
// 打开全部小文件并初始化缓冲区
for (int i = 0; i < FILE_CNT;i++) {
sprintf(currentFile, "./mount_data/split/%d.data", i);
fileHelpers[i].splitFileFp = fopen(currentFile, "rb");
if (fileHelpers[i].splitFileFp == NULL) {
printf("%s open fail!\n", currentFile);
return 0;
}
// 预读取数据到缓冲区中
fileHelpers[i].bufSize = fread(fileHelpers[i].nums, sizeof(LL), MAX_HANDLE_CNT, fileHelpers[i].splitFileFp);
fileHelpers[i].readCnt = 0;
fileHelpers[i].readFinish = 0;
}
// 打开最终文件并初始化缓冲区
resultHelper.fileFp = fopen("./mount_data/result.data", "wb");
if (resultHelper.fileFp == NULL) {
printf("./mount_data/result.data open fail!\n");
return 0;
}
resultHelper.writeCnt = 0;
// 已经读取完毕的文件个数
int finishCnt = 0;
int minIdx = 0;
LL minValue = (LL)(0x7fffffffffffffff);
printf("start merge\n");
while(finishCnt < FILE_CNT){
// 每次都从这一系列缓冲区中选出一个最小的元素,被选到的缓冲区的指针往后移
// 通过循环的方式 找出当前最小值
minValue = (LL)(0x7fffffffffffffff);
for(int i = 0; i < FILE_CNT; i++){
if(fileHelpers[i].readFinish == 1){
// 如果第 i 个文件已经读取完毕,则跳过
continue;
}
// 如果第 i 个文件的缓冲区的当前最小值比所记录的还小,则更新
if(fileHelpers[i].nums[fileHelpers[i].readCnt] <= minValue){
minIdx = i;
minValue = fileHelpers[i].nums[fileHelpers[i].readCnt];
}
}
// 写入目标文件的缓冲区
resultHelper.nums[resultHelper.writeCnt] = minValue;
resultHelper.writeCnt++;
// 目标文件缓冲区已满
if (resultHelper.writeCnt >= MAX_HANDLE_CNT){
printf("result buffer full, save to file\n");
fwrite(resultHelper.nums, sizeof(LL), resultHelper.writeCnt, resultHelper.fileFp);
resultHelper.writeCnt = 0;
}
// 更新刚刚选中的缓冲区
fileHelpers[minIdx].readCnt++;
// 如果缓冲区都读完了,则继续从文件中读进
if(fileHelpers[minIdx].readCnt >= fileHelpers[minIdx].bufSize){
printf("%d.data buffer empty, load from file\n", minIdx);
fileHelpers[minIdx].bufSize = fread(fileHelpers[minIdx].nums, sizeof(LL), MAX_HANDLE_CNT, fileHelpers[minIdx].splitFileFp);
// 如果不能读进新的内容,说明该文件已经全部读取完毕
if (fileHelpers[minIdx].bufSize <= 0){
fileHelpers[minIdx].readFinish = 1;
finishCnt++;
printf("finish %d.data\n", minIdx);
}
// 重置读取指针
fileHelpers[minIdx].readCnt = 0;
}
}
// 如果目标文件的缓冲区有内容尚未写入文件,则写入
if(resultHelper.writeCnt > 0){
fwrite(resultHelper.nums, sizeof(LL), resultHelper.writeCnt, resultHelper.fileFp);
resultHelper.writeCnt = 0;
}
// 关闭所有文件
for (int i = 0; i < FILE_CNT;i++) {
fclose(fileHelpers[i].splitFileFp);
}
fclose(resultHelper.fileFp);
printf("finish all!\n");
return 0;
}
最后,让我们来测试一下我们的结果:
// check_ans.c
#include<stdio.h>
#define MAX_LEN 0x7ffffff
#define LL long long
#define MIN(a,b) ((a) < (b) ? (a) : (b))
#define MAX(a,b) ((a) > (b) ? (a) : (b))
LL nums[MAX_LEN];
int main(){
FILE *fp = fopen("./mount_data/result.data", "rb");
if(fp == NULL){
printf("open fail!\n");
return 0;
}
size_t size;
int check = 1;
LL tmp;
// // 读出第一个
fread(&tmp, sizeof(LL), 1, fp);
int th = 0;
printf("start\n");
while(1){
size = fread(nums, sizeof(LL), MAX_LEN, fp);
if(size <= 0){
break;
}
if(nums[0] < tmp){
check = 0;
printf("ERROR 1 ! %lld < %lld\n", nums[0], tmp);
return 0;
}
for(int i = 1; i < size; i++){
if(nums[i] < nums[i - 1]){
check = 0;
printf("ERROR 2 ! %lld < %lld\n", nums[i], nums[i - 1]);
return 0;
}
}
printf("th = %d , size = %lu, check = %d\n", th++, size, check);
tmp = nums[size - 1];
}
fclose(fp);
printf("%d\n", check);
return 0;
}
当输出一系列的1时候,则代表我们的程序无误了!
本文由「黄阿信」创作,创作不易,请多支持。
如果您觉得本文写得不错,那就点一下「赞赏」请我喝杯咖啡~
商业转载请联系作者获得授权,非商业转载请附上原文出处及本链接。
关注公众号,获取最新动态!