什么是并查集(Union Find)?

并查集算法是用来解决动态连通性(Dynamic Connectivity)问题的一种算法,那么,什么是所谓的动态连通性呢?我们可以举个栗子,通过这个栗子,将能很快速的理解什么是动态连通性。

动态连通性(Dynamic Connectivity)

举个例子,有一群人,他们之间有若干好友关系。如果两个人有直接或者间接好友关系,那么我们就说他们在同一个朋友圈中,这里解释下,如果Alice是Bob好友的好友,或者好友的好友的好友等等,即通过若干好友可以认识,那么我们说Alice和Bob是间接好友。

union-find-friend

如上图,A和B是好友关系,C和D是好友关系,那么我们可以说A和B是具有连通性的,A和B组成了一个连通分量(Connected Component),同理,C和D也组成了一个连通分量。
那么,A和D成为好友之后,B可以通过A认识D和C这两个好友,C可以通过D认识A和B这两个好友,我们可以说A、B、C、D之间具有连通性,无论是间接的还是直接的。
这就叫动态连通性,那么我们将A和D连接起来的操作可以称为合并操作,合并操作在我们上面的例子中可以解释为将A与B所在的连通分量和C与D的连通分量进行合并。
那么在进行合并操作之前,我们如何知道合并与被合并的对象是否是具有连通性呢?这就是查找操作。
那么可以说,合并(Union) & 查找(Find)构成了并查集算法。

数据结构与API设计

数据结构是为了解决问题我们想要实现的方法规范,所以数据结构与良好的API设计是必要的。

1
2
3
4
5
6
7
8
9
10
public interface IUnionFind {
// Constructor
UnionFind(int initialNum);

// union operation
void union(int arg1, int arg2);

// query connected
boolean connected(int arg1, int arg2);
}

Quick Find

现在我们来实现解决动态连通性问题的算法的第一种实现,叫做快速查找算法。

quick-find

我们使用数组来作为我们的数据结构,如果p和q是连通的,当且仅当他们在数组中的项也是一样的。例如上面这张图,包含了三个连通分量{0 5 6}{1 2 7}{3 4 8 9}
下面是详细的实现代码,你也可以在我的Github仓库找到此代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package net.sunzhenyu.miscellaneous.dsa.other.unionfind;

/**
* UnionFind Interface
*
* @author <a href="mailto:sunzhenyucn@gmail.com">SunZhenyu</a>
* @version 0.0.1
* @date 2018/4/16
* @since 1.8
*/
public interface IUnionFind {

void union(int arg1, int arg2);

boolean connected(int arg1, int arg2);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package net.sunzhenyu.miscellaneous.dsa.other.unionfind;

import lombok.extern.slf4j.Slf4j;

/**
* UnionFind Interface Impl
*
* @author <a href="mailto:sunzhenyucn@gmail.com">SunZhenyu</a>
* @version 0.0.1
* @date 2018/4/16
* @since 1.8
*/
@Slf4j
public class UnionFind implements IUnionFind {

private int[] arr;

public int[] getArr() {
return arr;
}

public UnionFind(int initialNum) {
if (initialNum <= 0) {
throw new IllegalArgumentException("The initial number:" + initialNum + "is illegal.");
}
arr = new int[initialNum];
for (int i = 0; i < initialNum; i++) {
arr[i] = i;
}
}

@Override
public void union(int arg1, int arg2) {
int arg1Num = this.arr[arg1];
int arg2Num = this.arr[arg2];
for (int i = 0; i < this.arr.length; i++) {
if (this.arr[i] == arg1Num) {
this.arr[i] = arg2Num;
}
}
}

@Override
public boolean connected(int arg1, int arg2) {
return arr[arg1] == arr[arg2];
}


public void print(String type) {
StringBuilder content = new StringBuilder(type);
for (int i = 0; i < this.arr.length; i++) {
content.append(arr[i]).append(" ");
}
log.info(content.toString());
}
}

测试代码以及结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package net.sunzhenyu.miscellaneous.dsa;

import lombok.extern.slf4j.Slf4j;
import net.sunzhenyu.miscellaneous.dsa.other.unionfind.UnionFind;
import org.junit.Test;

@Slf4j
public class UnionFindTest {

@Test
public void unionFindTest() {
UnionFind uf = new UnionFind(10);
uf.union(3, 9);
uf.print("After union(3, 9), ");
uf.union(9, 8);
uf.print("After union(9, 8), ");
uf.union(3, 4);
uf.print("After union(3, 4), ");
}

}

union-find-junit-test.png

这样我们就完成了快速查找算法的详细设计与实现,但是很不幸的是这个算法的效率是很慢很慢的,甚至于是无法接受的。因为初始化和合并操作都要整体遍历一次数组,所以时间复杂度取决于数组的大小,如果在数据量大的情况下,每次合并操作都需要遍历一次数组这是不现实的,下面我们来看一种优化过的算法:快速合并算法(Quick Union)。

Quick Union

快速合并算法的设计是所谓的“懒策略”,我们尽量会去避免计算直到不得不计算的情况。
我们还是使用和快速查找算法同样的数据结构,不过现在数组中的数据含义有了变化,代表的是当前对应节点的父节点,整体呈现出一个树的结构。

quick-union-1

利用这个数据结构我们可以将每个记录与一个根节点相联系,我们可以从子节点一路找到根节点,一旦我们可以找得到根节点,我们相应的就可以这样实现查找操作:通过检查根节点是否相同,如果相同,就代表它们是处于同一个连通分量的。
合并操作就更加简单了,假如我们如下图有两个连通分量p和q,需要合并操作的话我们只需要将p的根节点记录值改为q的根节点记录值就可以了。这就是快速合并算法。

quick-union-2

下面我们来看看代码层面的实现与测试结果,同样的,你可以在我的Github仓库找到此代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package net.sunzhenyu.miscellaneous.dsa.other.unionfind;

/**
* QuickUnion Interface
*
* @author <a href="mailto:sunzhenyucn@gmail.com">SunZhenyu</a>
* @version 0.0.1
* @date 2018/4/16
* @since 1.8
*/
public interface IQuickUnion {

int root(int arg1);

boolean connected(int arg1, int arg2);

void union(int arg1, int arg2);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package net.sunzhenyu.miscellaneous.dsa.other.unionfind;

import lombok.extern.slf4j.Slf4j;

/**
* QuickUnion Impl
*
* @author <a href="mailto:sunzhenyucn@gmail.com">SunZhenyu</a>
* @version 0.0.1
* @date 2018/4/16
* @since 1.8
*/
@Slf4j
public class QuickUnion implements IQuickUnion {

private int[] arr;

public QuickUnion(int initialNum) {
if (initialNum <= 0) {
throw new IllegalArgumentException("The initial number:" + initialNum + "is illegal.");
}
arr = new int[initialNum];
for (int i = 0; i < arr.length; i++) {
arr[i] = i;
}
}

@Override
public int root(int arg1) {
while (arg1 != arr[arg1]) {
arg1 = arr[arg1];
}
return arg1;
}

@Override
public boolean connected(int arg1, int arg2) {
return root(arg1) == root(arg2);
}

@Override
public void union(int arg1, int arg2) {
int arg1Num = root(arg1);
int arg2Num = root(arg2);
arr[arg1Num] = arg2Num;
}

public void print(String type) {
StringBuilder content = new StringBuilder(type);
for (int i = 0; i < this.arr.length; i++) {
content.append(arr[i]).append(" ");
}
log.info(content.toString());
}
}

测试代码与结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package net.sunzhenyu.miscellaneous.dsa;

import lombok.extern.slf4j.Slf4j;
import net.sunzhenyu.miscellaneous.dsa.other.unionfind.QuickUnion;
import org.junit.Test;

@Slf4j
public class QuickUnionTest {

@Test
public void quickUnionTest() {
QuickUnion qu = new QuickUnion(10);
qu.union(1, 2);
qu.print("After union(1, 2), ");
qu.union(3, 1);
qu.print("After union(3, 1), ");
}
}

quick-union-junit-test.png

很可惜我们这个快速合并算法还是比较慢,相对于快速查找算法来说是另一种慢,查找操作在某些情况下会很慢,这是因为每次子节点的查找操作都会回溯整颗树,这在树比较大的情况下是不可想象的。

Weighted Quick Union

接下来我们来看一种基于Quick Union的改进算法,加权快速合并算法(Weighted Quick Union),这种算法相对于上面两种,更加适合于解决大型的动态连通性问题。
那么什么是加权呢?我们在Quick Union算法中进行合并操作的时候,将两个连通分量树进行合并,会导致在数据量比较大的情况下,出现高树,我们在Weighted Quick Union算法中合并操作的时候,需要比对两个连通分量之间的分量数目,保证分量数目小的树永远处于分量数目大的树的子节点,且保证合并之后每个子节点追溯到根节点的步数保持在4步以内,这就叫加权。
那么如何在Quick Union算法的基础上来实现Weighted Quick Union算法呢?
我们首先需要两个数组来支撑起我们整个算法的数据结构,第二个数组需要用来保存每个连通分量下的分量数目。
接下来我们需要修改union(int arg1, int arg2)方法来保证在合并的过程中对连通分量加以权重。
完整代码可在我的Github仓库找到

1
2
3
4
5
6
7
8
9
10
11
12
int arg1Num = root(arg1);
int arg2Num = root(arg2);
if (arg1Num == arg2Num) {
return;
}
if (size[arg1Num] < size[arg2Num]) {
arr[arg1Num] = arg2Num;
size[arg2Num] += size[arg1Num];
} else {
arr[arg2Num] = arg1Num;
size[arg1Num] += size[arg2Num];
}

接下来我们看下测试代码与结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package net.sunzhenyu.miscellaneous.dsa;

import lombok.extern.slf4j.Slf4j;
import net.sunzhenyu.miscellaneous.dsa.other.unionfind.WeightedQuickUnion;
import org.junit.Test;

@Slf4j
public class WeightedQuickUnionTest {

@Test
public void weightedQuickUnionTest() {
WeightedQuickUnion wqu = new WeightedQuickUnion(10);
wqu.union(2, 3);
wqu.print("After union(2, 3), ");
wqu.union(2, 4);
wqu.print("After union(2, 4), ");
wqu.union(4, 8);
wqu.print("After union(4, 8), ");
}
}

wqu-result.png

后话

那么这里就算将并查集算法初步学习完毕,学习算法最主要的是了解一个算法性能与不足和改进之处,重要的是为什么而不是怎么做。

补充:使用路径压缩进一步优化算法

当我们在指定的连通分量中的子节点追溯到根节点的过程中,我们可以接触到从当前节点到根节点的每一个节点,我们可以试图将路径上每一个子节点都指向根节点。这样的操作是需要额外的常数时间的。这能够保证我们下次回溯的时候从任意节点追溯到根节点的路径都不会太远。

pc.png
pc2.png

我们只需要在root(int arg1)方法中添加一行代码既可以实现。

1
2
3
4
5
6
7
8
@Override
public int root(int arg1) {
while (arg1 != arr[arg1]) {
arr[arg1] = arr[arr[arg1]];
arg1 = arr[arg1];
}
return arg1;
}