要么改变世界,要么适应世界

当我们要对100亿的数据排序,内存无法一次性装下数据时,怎么办?(附代码)

2024-03-10 09:31:19
0
目录

估计不少人会被问到这个问题,这个其实是外排序问题,不同于内排序全程都可以在内存中进行,外排序一般数据量巨大,当然也是有解决方案的,解决方案不外乎几种:

  1. 分块读取,然后插入数据库中,让数据库帮我们排序,我们分块查询数据库,然后分块读取保存进文件(这种方法简单直接,技术难度没那么大,但是对数据库服务器要求比较高)
  2. 分治法,将原始大文件切割成小文件,使得小文件内的数据能够一次性加载进内存,小文件内的数据先排序,然后每次从这些小文件中读取若干数据出来,使用归并排序合并成大的文件。
  3. 如果这些数据的范围都不大,则有很多数据是重复的,我们使用计数排序即可。

不过方法三有较大的局限性,如果某个数字重复非常多,那么有可能在计数过程就溢出了(当然你可能还会说:就算是有100亿个重复的,C语言中使用long long类型来保存计数那就不存在溢出了,可是如果不止100亿呢,而且每个数字范围还很大,超过数组的下标范围呢,例如C语言中数组的下标是int类型的)。

下面演示一下方法二的整个实操过程。

问题描述与分析

我们先正式描述一下我们的问题:

假设我们有100亿左右的数据,每个数字范围是8字节表示的有符号数,对该数据升序排序,使用C语言完成,你只能使用4GB的内存。

分析:数字范围是8字节表示,则计数排序无法完成,因为C语言数组的下标必须是一个int类型的,法二无法使用;100 0000 0000 * 8 = 800 0000 0000 B,约为74GB,远大于能够使用的内存,但是不影响,我们把文件切割小一点即可。

整个过程主要分成三个阶段:

  1. 将原始文件切割成若干段,每段占用内存m字节(当然最后一段可能不足m字节)要求每段能够一次性读入内存,这些文件一共n个,先把小文件内的数组排序。
  2. 一次性对所有n个文件读取l字节的数据,要求(n+1)*l不能超过内存上限,对这n段进行归并排序(n路归并排序),因为归并排序的结果也要耗内存,因此我们要求(n+1)*l不能超过内存上限,而不是n*l,归并结果保存到文件中
  3. 从之前的位置开始,继续读取这n个文件的下一段l字节的数据,重复上述过程,将归并排序追加到末尾,直至所有的数据都读取完毕

环境准备

我们首先需要准备100亿的数据,为了表示一般性,我们可以准备多于100亿的数据,数据就以二进制方式存放,每8个字节作为一个元素,元素之间也不需要什么间隔符了。

C语言中int类型最大的正数是0x7ffffff,约为1亿多,如果我们定义这么大的缓冲区,然后循环100次,即可生成100多亿的数据,使用C语言即可完成数据的准备:

// generate.c
#include<stdio.h>
#include<stdlib.h>
#include<time.h>
#define MAX_LEN 0x7ffffff
#define PRIME 19
#define LL long long

LL nums[MAX_LEN];
int  main(){
    srand((unsigned)time(NULL));
    // 覆盖式写入MAX_LEN个数字,以二进制方式
    FILE *fp = fopen("./mount_data/data.data", "wb");
    LL num1, num2;
    int postCnt = 0;
    if (fp != NULL){
        printf("open\n");
        for(int st = 1; st <= 100; st++){
            postCnt = 0;
            for(int i = 0; i < MAX_LEN;i++){
                // 产生一个随机数字 范围比 [-0x7ffffff, 0x7ffffff] 还大 
                num1 = (LL)(rand()) * 100;
                num2 = (LL)(rand()) * 100;
                nums[i] = num1 - num2;
                if(nums[i] > 0){
                    postCnt++;
                }
            }
            printf("st = %d, postCnt = %d\n", st, postCnt);
            fwrite(nums, sizeof(LL), MAX_LEN, fp);
        }
        // 再写入几个数字,制造切割不均匀现象
        printf("add some\n");
        for(int i = 0; i < PRIME; i++){
            nums[i] = (LL)(rand()) * (LL)(0x7ffffff);
        }
        fwrite(nums, sizeof(LL), PRIME, fp);
        printf("finish!\n");
        fclose(fp);
    } else{
        printf("can't open file\n");
    }

    return  0;
}

从控制台中,我们可以看到正数负数几乎是对半的:

open
st = 1, postCnt = 67107155
st = 2, postCnt = 67119290
st = 3, postCnt = 67108807
st = 4, postCnt = 67105100
st = 5, postCnt = 67118299
st = 6, postCnt = 67106095
st = 7, postCnt = 67105083
st = 8, postCnt = 67107339
st = 9, postCnt = 67100382
...

我们来看一下数据大小:

du -sh mount_data/data.data
100G    mount_data/data.data

排序实施

切分数据并排序小文件

我们能用4GB的内存,但是我们不能切分文件为4GB,这是因为程序运行过程也要内存,因此我们可以先试着把数据切分成2GB的大小,2GB的数据可以一次性加载了,2GB的大小能够存放的long long 类型数据个数:

>>> 2 * 1024 * 1024 * 1024 / 8
268435456.0

但是已经超过int表示范围了,为了方便我们使用数组进行后续排序,我们把单个文件限制成1GB的,这个其实影响不大,读取磁盘次数翻倍了而已。

>>> 1 * 1024 * 1024 * 1024 / 8
134217728.0

刚好是4字节int类型最大能表示的范围。

因此我们需要每次最多读取上述个数的数字出来排序,然后这些数据作为一个数据块,写入到文件中,排序我们使用的是快速排序。

// split_sort.c
#include<stdio.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<time.h>
#define MAX_LEN 0x7ffffff
#define LL long long

LL nums[MAX_LEN];

void qSort(int l, int r){
    if (l >= r) return;
    int i = l, j = i + 1;
    LL key = nums[l], tmp;
    for (; j <= r; j++) {
        if (nums[j] < key) {
            i++;
            // swap(i, j);
            tmp = nums[i];
            nums[i] = nums[j];
            nums[j] = tmp;
        }
    }
    // swap(l, i);
    tmp = nums[i];
    nums[i] = nums[l];
    nums[l] = tmp;
    qSort(l, i - 1);
    qSort(i + 1, r);
}

int  main(){
    char sizeStr[20];
    char currentFile[40];
    // 只读方式打开二进制文件
    FILE *fpOrigin = fopen("./mount_data/data.data", "rb");
    // 写方式打开文本文件
    FILE *fpSize =  fopen("./mount_data/size.txt", "w");
    // 用于标识第几个文件
    int cnt = 0;
    size_t size;
    if(fpOrigin == NULL){
        printf("can't open file ./mount_data/data.data \n");
        return 0;
    }
    if(fpSize == NULL){
        printf("can't open file ./mount_data/size.txt \n");
        return 0;
    }

    printf("open\n");
    while((size = fread(nums, sizeof(LL), MAX_LEN, fpOrigin)) > 0){
        // 记录下当前文件中包含的元素个数
        sprintf(sizeStr, "%lud\n", size);
        fwrite(sizeStr,strlen(sizeStr), sizeof(char), fpSize);
        // 使用快速排序对当前小文件的数据排序
        qSort(0, size);
        sprintf(currentFile, "./mount_data/split/%d.data", cnt++);
        // 写方式打开二进制文件
        FILE *currentFp =  fopen(currentFile, "wb");
        if(currentFp == NULL){
            printf("can't open file %s \n", currentFile);
            continue;
        }
        // 写入实际读取到的数据个数
        fwrite(nums, sizeof(LL), size, currentFp);
        fclose(currentFp);
        printf("finish split %d \n", cnt - 1);
    } 
    printf("finish split all \n");
    fclose(fpOrigin);
    fclose(fpSize);

    return  0;
}

我们可以编写一个测试的文件看看我们的小文件是否已经排序完成:

// check_sort.c
#include<stdio.h>
#define FILE_CNT 101
#define MIN(a,b) ((a) < (b) ? (a) : (b))
#define MAX_LEN 0x7ffffff
#define LL long long

LL nums[MAX_LEN];
int main(){
    char currentFile[40];
    size_t size;
    int ok = 1;
    for(int i = 0; i < FILE_CNT;i++){
        sprintf(currentFile, "./mount_data/split/%d.data", i);
        FILE *currentFp =  fopen(currentFile, "rb");
        if(currentFp != NULL){
            ok = 1;
            // 检查每个文件的数据
            size = fread(nums, sizeof(LL), MAX_LEN, currentFp);
            for(int i = 1; i < size;i++){
                if(nums[i] < nums[i - 1]){
                    ok = 0;
                }
            }
            printf("%s : %d\n", currentFile, ok);
            fclose(currentFp);
        } else{
            printf("%s open fail!\n", currentFile);
        }
    }
    printf("check finish\n");
    return 0;
}

当所有的输出都是1时,即代表这些小文件内是有序的。

归并排序,合并成最终数据

我们总共有101个文件,根据之前的分析,我们要满足:

(101 + 1) * l <= 2GB

则需要满足l<=21053761B,这些内存能存放8字节的数据的个数是21053761 / 8 = 2631720

n路归并排序这一部分的大体逻辑是:

维护n小文件的个缓冲区(这里n=101),每个缓冲区都有一个读取指针,由于小文件已经有序,因此缓冲区中的内容也必然有序,每次从这些候选缓冲区中找出最小的值,被选中的缓冲区指针往后移,这便是归并排序,无需过多说明。

选取过程我们可以用优先队列,但是c语言手写优先队列比较繁琐,也不是本文的目的,使用优先队列带来的提升只是由原先的$O(101)$变为$O(\log 101)\approx 7$,并非是数量级别上的提升,因此就使用循环的方式。

每当有缓冲区读取完毕时,我们都需要从对应的文件中继续读取数据到缓冲区中,且每当存放最终结果的缓冲区被写满后我们也要写入文件中。

// merge_sort.c
#include<stdio.h>
#define FILE_CNT 101
#define LL long long
#define MAX_HANDLE_CNT 2631720

typedef struct {
    // 存放小文件的文件指针
    FILE* splitFileFp;
    // 缓冲区,存放读取出来的一段数据
    LL nums[MAX_HANDLE_CNT];
    // 当前缓冲区大小
    int bufSize;
    // 缓冲区已经读取的数据个数(读取指针)
    int readCnt;
    // 是否已经读取完所有数据
    int readFinish;
}FileHelper;
FileHelper fileHelpers[FILE_CNT];
typedef struct {
    // 存放最终文件的文件指针
    FILE* fileFp;
    // 缓冲区,暂时存放写入的一段数据
    LL nums[MAX_HANDLE_CNT];
    // 缓冲区已经写入的数据个数(写入指针)
    int writeCnt;
}ResultHelper;
ResultHelper resultHelper;
int main() {
    char currentFile[40];
    // 打开全部小文件并初始化缓冲区
    for (int i = 0; i < FILE_CNT;i++) {
        sprintf(currentFile, "./mount_data/split/%d.data", i);
        fileHelpers[i].splitFileFp = fopen(currentFile, "rb");
        if (fileHelpers[i].splitFileFp == NULL) {
            printf("%s open fail!\n", currentFile);
            return 0;
        }
        // 预读取数据到缓冲区中
        fileHelpers[i].bufSize = fread(fileHelpers[i].nums, sizeof(LL), MAX_HANDLE_CNT, fileHelpers[i].splitFileFp);
        fileHelpers[i].readCnt = 0;
        fileHelpers[i].readFinish = 0;
    }
    // 打开最终文件并初始化缓冲区
    resultHelper.fileFp = fopen("./mount_data/result.data", "wb");
    if (resultHelper.fileFp == NULL) {
        printf("./mount_data/result.data open fail!\n");
        return 0;
    }
    resultHelper.writeCnt = 0;

    // 已经读取完毕的文件个数
    int finishCnt = 0;
    int minIdx = 0;
    LL minValue = (LL)(0x7fffffffffffffff);
    printf("start merge\n");
    while(finishCnt < FILE_CNT){
        // 每次都从这一系列缓冲区中选出一个最小的元素,被选到的缓冲区的指针往后移
        // 通过循环的方式 找出当前最小值
        minValue = (LL)(0x7fffffffffffffff);
        for(int i = 0; i < FILE_CNT; i++){
            if(fileHelpers[i].readFinish == 1){
                // 如果第 i 个文件已经读取完毕,则跳过
                continue;
            }
            // 如果第 i 个文件的缓冲区的当前最小值比所记录的还小,则更新
            if(fileHelpers[i].nums[fileHelpers[i].readCnt] <= minValue){
                minIdx = i;
                minValue = fileHelpers[i].nums[fileHelpers[i].readCnt];
            }
        }
        // 写入目标文件的缓冲区
        resultHelper.nums[resultHelper.writeCnt] = minValue;
        resultHelper.writeCnt++;
        // 目标文件缓冲区已满
        if (resultHelper.writeCnt >= MAX_HANDLE_CNT){
            printf("result buffer full, save to file\n");
            fwrite(resultHelper.nums, sizeof(LL), resultHelper.writeCnt, resultHelper.fileFp);
            resultHelper.writeCnt = 0;
        }
        // 更新刚刚选中的缓冲区
        fileHelpers[minIdx].readCnt++;
        // 如果缓冲区都读完了,则继续从文件中读进
        if(fileHelpers[minIdx].readCnt >= fileHelpers[minIdx].bufSize){
            printf("%d.data buffer empty, load from file\n", minIdx);
            fileHelpers[minIdx].bufSize = fread(fileHelpers[minIdx].nums, sizeof(LL), MAX_HANDLE_CNT, fileHelpers[minIdx].splitFileFp);
            // 如果不能读进新的内容,说明该文件已经全部读取完毕
            if (fileHelpers[minIdx].bufSize <= 0){
                fileHelpers[minIdx].readFinish = 1;
                finishCnt++;
                printf("finish %d.data\n", minIdx);
            }
            // 重置读取指针
            fileHelpers[minIdx].readCnt = 0;
        }
    }
    // 如果目标文件的缓冲区有内容尚未写入文件,则写入
    if(resultHelper.writeCnt > 0){
        fwrite(resultHelper.nums, sizeof(LL), resultHelper.writeCnt, resultHelper.fileFp);
        resultHelper.writeCnt = 0;
    }
    // 关闭所有文件
    for (int i = 0; i < FILE_CNT;i++) {
        fclose(fileHelpers[i].splitFileFp);
    }
    fclose(resultHelper.fileFp);
    printf("finish all!\n");
    return 0;
}

最后,让我们来测试一下我们的结果:

// check_ans.c
#include<stdio.h>
#define MAX_LEN 0x7ffffff
#define LL long long
#define MIN(a,b) ((a) < (b) ? (a) : (b))
#define MAX(a,b) ((a) > (b) ? (a) : (b))
LL nums[MAX_LEN];

int main(){
    FILE *fp = fopen("./mount_data/result.data", "rb");
    if(fp == NULL){
        printf("open fail!\n");
        return 0;
    }
    size_t size;
    int check = 1;
    LL tmp;
    // // 读出第一个
    fread(&tmp, sizeof(LL), 1, fp);
    int th = 0;
    printf("start\n");
    while(1){
        size = fread(nums, sizeof(LL), MAX_LEN, fp);
        if(size <= 0){
            break;
        }
        if(nums[0] < tmp){
            check = 0;
            printf("ERROR 1 ! %lld < %lld\n", nums[0], tmp);
            return 0;
        }
        for(int i = 1; i < size; i++){
            if(nums[i] < nums[i - 1]){
                check = 0;
                printf("ERROR 2 ! %lld < %lld\n", nums[i], nums[i - 1]);
                return 0;
            }
        }
        printf("th = %d , size = %lu, check = %d\n", th++, size, check);
        tmp = nums[size - 1];
    }
    fclose(fp);
    printf("%d\n", check);
    return 0;
}

当输出一系列的1时候,则代表我们的程序无误了!

历史评论
开始评论