K通道并归算法的优化实现
排序大规模的数据常用到外部并归排序。此算法分为两个步骤:首先把要处理的数据分成若干个区块,对每个区块载入内存进行排序;第二步则是把排序好的区块进行并归融合。我用此文章来介绍下一个优化的并归算法,称为K通道并归算法。该算法可以使用一次并归处理,就可以有效的融合K个区块。
对于比较少的数据进行排序,一般是直接载入主内存,进行快速排序。大多数主流语言都提供了API库支持:比如.NET运行库的Array.Sort函数,Java运行库的Arrays.sort函数注1;简单的调用API库函数即可排序好数据。
但是在实际情况中,有时我们需要对大规模的数据进行排序。比如处理一个搜索引擎爬虫产生的数据,可能一天有几个GB的数据量,甚至上TB的数据。把这些数据全部加载入内存进行快速排序是不现实的。所以才有了外部并归排序算法。
假设我们要处理的数据是存储在一个外部文件中。文件的每一行都保存两列数据。第一列是ID,是一个32 bit int,第二列是Frequency,是一个64 bit int。列之间以跳格符 \t 分隔开,行之间以换行符 \n 分隔开。这个文件的头几行数据大概类似:
53 523 35 35 153 2510 58 9134 803 7091
要求是:1、必须把此文件的每一行数据按照第一列进行排序;2、把排序好的数据存到另外一个不同的文件。
简单介绍下第一个步骤:把要处理的数据分成若干个区块,对每个区块载入内存进行排序。我用C#来实现代码。
假设我们用如下的类来表示每一行的数据:
class Entry : IComparable<Entry>
{
public int Id { get; set; }
public long Freq { get; set; }
public int CompareTo(Entry other) {
return this.Id - other.Id;
}
public Entry(string line) {
string[] tokens = line.Split('\t');
this.Id = Int32.Parse(tokens[0]);
this.Freq = Int32.Parse(tokens[1]);
}
public override string ToString() {
return this.Id + "\t" + this.Freq;
}
}
每次载入100万行数据,进行排序,然后把排序好的数据写入一个临时文件里面。代码如下:
private const int INPUT_BUFFER_SIZE = 1024 * 1024 * 100;
private const int OUTPUT_BUFFER_SIZE = 1024 * 1024 * 100;
private const int SORT_BUFFER_SIZE = 1000000;
private void Split(string inputFile, string tmpFilePattern) {
using (FileStream fsin = new FileStream(inputFile, FileMode.Open))
using (BufferedStream bfsin = new BufferedStream(fsin, INPUT_BUFFER_SIZE))
using (StreamReader sr = new StreamReader(bfsin)) {
List<Entry> sortBuffer = new List<Entry>(SORT_BUFFER_SIZE);
int fcounter = 0;
string tmpFile = null;
while (ReadBlock(sr, sortBuffer) > 0) {
tmpFile = String.Format(tmpFilePattern, fcounter++);
SaveBlock(sortBuffer, tmpFile);
sortBuffer.Clear();
}
}
}
private int ReadBlock(StreamReader reader, List<Entry> buff) {
string line = null;
int counter = 0;
while ((line = reader.ReadLine()) != null) {
buff.Add(new Entry(line));
counter++;
}
return counter;
}
private void SaveBlock(List<Entry> buffer, string outputFile) {
using (FileStream fsout = new FileStream(outputFile, FileMode.Create))
using (BufferedStream bfsout = new BufferedStream(fsout, OUTPUT_BUFFER_SIZE))
using (StreamWriter sw = new StreamWriter(bfsout)) {
string line = null;
foreach (Entry entry in buffer) {
line = entry.ToString();
sw.WriteLine(line);
}
sw.Flush();
}
}
接下来是第二个步骤:并归产生的若干个区块。我们已知每个区块都已经是排序好的。我们会同时打开K个文件,进行并归操作。所以这个算法叫做K通道并归算法。
以如下的3个输入流做例子。每一个输入流都是已经排序好的整数。首先我们打开3个指针,指向每一个流的第一个元素。

找到指针指向最小的值,把它写入输出流。然后把最小值所在的流的指针前进一位。例子中最小的值是1,所在的流是第一个流。这一步操作之后,3个流变成了如下所示:

重复前一个步骤:找到指针指向最小的值,把它写入输出流。然后把最小值所在的流的指针前进一位。现在中最小的值是2,所在的流是第一个流。这一步操作之后,3个流变成了如下所示:

重复前一个步骤。最小的值是2,所在的流是第二个流。这一步操作之后,3个流变成了如下所示:

重复前一个步骤。最小的值是3,所在的流是第三个流。这一步操作之后,3个流变成了如下所示:

如此反复,一直到读取完所有的输入流。这个时候,并归的输出流的数据就是已经完全排序好了。
现在有个问题:并归的每一步操作都需要从K个输入流中找出最小的值。最简单的实现,就是把每一个输入流给扫描一遍以找到最小值。这样每一步骤会消耗O(K),假设并归之后总共有N个值,那么整个并归会消耗O(KN)。显然比较慢,不可取。可以有更优的解法么?
答案是使用优先队列(Priority Queue)这个数据结构。给定K个数,最小优先队列能够:在O(1)的时间里面找到最小值,在O(logK)的时间里面移除最小值,在O(logK)的时间里插入一个新的值。这样,每一步会消耗O(logK),对于总共N个值的并归操作,总的消耗是O(NlogK)。.NET运行库现在并没有提供优先队列的实现注2。
K通道并归实现的代码如下:
private const int MEGRE_FILE_READ_BUFFER = 1024 * 1024 * 10;
private const int MEGRE_FILE_WRITE_BUFFER = 1024 * 1024 * 50;
public void KwayMerge(string[] tmpFiles, string outputFile) {
EntryEnumerator[] ee = null;
StreamWriter fout = null;
try {
ee = OpenEnumerators(tmpFiles);
fout = OpenOutputFile(outputFile);
Merge(ee, fout);
} finally {
foreach (EntryEnumerator e in ee) {
if (e != null)
e.Dispose();
}
if (fout != null)
fout.Dispose();
}
}
private void Merge(EntryEnumerator[] ee, StreamWriter fout) {
PriorityQueue<EntryEnumerator> mergeBuffer = new PriorityQueue<EntryEnumerator>();
foreach (EntryEnumerator e in ee) {
e.MoveNext();
mergeBuffer.Add(e);
}
while (mergeBuffer.Count >= 1) {
EntryEnumerator e = mergeBuffer.Remove();
Entry key = e.Current;
fout.WriteLine(key.ToString());
if (e.MoveNext())
mergeBuffer.Add(e);
}
if (mergeBuffer.Count > 0) {
EntryEnumerator lastOne = mergeBuffer.Min;
do {
Entry key = lastOne.Current;
fout.WriteLine(key.ToString());
} while (lastOne.MoveNext());
}
fout.Flush();
}
OpenEnumerators和OpenOutputFile的代码如下:
private EntryEnumerator[] OpenEnumerators(string[] tmpFiles) {
EntryEnumerator[] ee = new EntryEnumerator[tmpFiles.Length];
for (int i = 0; i < tmpFiles.Length; i++) {
FileStream fs = new FileStream(tmpFiles[i], FileMode.Open);
BufferedStream bfs = new BufferedStream(fs, MEGRE_FILE_READ_BUFFER);
StreamReader sr = new StreamReader(bfs);
ee[i] = new EntryEnumerator(sr, i);
}
return ee;
}
private StreamWriter OpenOutputFile(string file) {
FileStream fs = new FileStream(file, FileMode.CreateNew);
BufferedStream bfs = new BufferedStream(fs, MEGRE_FILE_WRITE_BUFFER);
return new StreamWriter(bfs);
}
EntryEnumerator和Entry的代码如下:
class EntryEnumerator : IEnumerator<Entry>, IComparable<EntryEnumerator>, IEquatable<EntryEnumerator>
{
private StreamReader sr;
private int srId;
private Entry current;
public EntryEnumerator(StreamReader sr, int srId) {
this.sr = sr;
this.srId = srId;
}
public bool MoveNext() {
string line = sr.ReadLine();
if (line == null) {
current = null;
return false;
} else {
current = new Entry(line);
return true;
}
}
public Entry Current {
get {
return this.current;
}
}
public void Dispose() {
sr.Dispose();
}
object System.Collections.IEnumerator.Current {
get { return this.current; }
}
public void Reset() {
throw new NotImplementedException();
}
public int CompareTo(EntryEnumerator other) {
int comp = this.current.CompareTo(other.current);
if (comp != 0)
return comp;
return this.srId - other.srId;
}
public bool Equals(EntryEnumerator other) {
if (other == null)
return false;
return this.current.Equals(other.current) &&
this.srId == other.srId;
}
public override int GetHashCode() {
return this.srId;
}
public override bool Equals(object obj) {
return this.Equals(obj as EntryEnumerator);
}
}
class Entry : IComparable<Entry>, IEquatable<Entry>
{
public int Id { get; set; }
public long Freq { get; set; }
public Entry(string line) {
string[] tokens = line.Split('\t');
this.Id = Int32.Parse(tokens[0]);
this.Freq = Int32.Parse(tokens[1]);
}
public int CompareTo(Entry other) {
return this.Id - other.Id;
}
public bool Equals(Entry other) {
if (other == null)
return false;
return this.Id == other.Id &&
this.Freq == other.Freq;
}
public override bool Equals(object obj) {
return this.Equals(obj as Entry);
}
public override int GetHashCode() {
return this.Id;
}
public override string ToString() {
return this.Id + "\t" + this.Freq;
}
}
_______________________
注1:Java的排序实现有点意思,对原始型别(比如int,float,byte)是执行的快速排序,而对象型别则是进行并归排序。
注2:可能.NET运行库里面最像优先队列的是SortedSet了。我查看了微软的源代码,它的取得最小值操作的时候,对整个Tree做一个InOrder的遍历,时间消耗是O(n)。有意思的是,Mono运行库里面对SortedSet的实现,却是优先队列;获取Min的操作是O(logn)。