前几天发帖/t/785801讨论匹配大量 ip 的方法,经过几天的思考,把最终使用的方法分享给大家。
比较有名的服务商有:ipip(付费), maxmind (付费),纯真 (免费)。
但在这个应用场景下,我们并不需要具体的位置信息,类似的方案会浪费不必要的内存因此放弃。
后面两个方法有个前提:ip 地址列表中大部分是连续的。
这里我们已有了国内 ip 地址列表(已有开源的库,很好找,另外我用的这个库已经把 ip 合并为了 CIDR 格式的地址)。
我们先通过二进制把 ip 转为可直接比较的数字,再把连续的 ip 变为 (start_ip, end_ip)
这样的集合,就可以利用二分法快速查找了。
import ipcalc
class ChinaIp:
def __init__(self):
self.data = []
def load(self, cidr_file='data/china_ip_list.txt'):
with open(cidr_file, 'r')as f:
for s in f.readlines():
self.add(s.strip())
def add(self, cidr):
n = ipcalc.Network(cidr)
self.data.append((n.host_first().ip, n.host_last().ip))
def search(self, ip):
l = 0
r = len(self.data) - 1
while l <= r:
mid = (l + r) // 2
if self.data[mid][0] <= ip <= self.data[mid][1]:
return True
elif self.data[mid][0] > ip:
r = mid - 1
elif self.data[mid][1] < ip:
l = mid + 1
else:
return False
return False
def __contains__(self, item):
ip = ipcalc.IP(item).ip
return self.search(ip)
china_ip = ChinaIp()
china_ip.load()
print('223.70.163.83' in china_ip)
CIDR
是形如 x.x.x.x/n
这样的地址,它表示一组网络地址相同的 ip,其中 n 表示前 n 位作为网络地址。
根据 CIDR 的特性,我们可以得到这样的结论:同一 CIDR 下的 ip,其网络地址是相同的。
因此我们可以把所有国内 cidr 地址的网络地址取出,放字典;对于一个 ip,尝试可能的网络地址(即 n ),看其是否在字典中。
import ipcalc
class ChinaIp(object):
def __init__(self):
self.data = {}
def load(self, cidr_files='data/china_ip_list.txt'):
with open(cidr_files, 'r')as f:
cidr_list = f.readlines()
for cidr in cidr_list:
self.insert(cidr.strip())
def insert(self, cidr):
network = ipcalc.Network(cidr)
self.data[str(network.netmask())]=True
def search(self, netmask: str) -> bool:
for i in netmask.split('.'):
node = node.children.get(i, None)
if node is None:
return False
if node.is_end:
return True
return False
def __contains__(self, ip):
for i in range(1,33):
if self.search(str(ipcalc.Network(f'{ip}/{i}').netmask())):
return True
return False
china_ip = ChinaIp()
china_ip.load()
print('223.70.163.83' in china_ip)
这个算法看起来没啥毛病,但实际测试中速度比第二种慢了很多,耗时的地方在比较时必须循环所有 n,而二分法可以快速的排除不可能的部分。
对于这种情况,有两种优化方法:
class ChinaIp(object):
...
def __contains__(self, ip):
l = list(range(1, 33))
random.shuffle(l)
for i in l:
if self.search(str(ipcalc.Network(f'{ip}/{i}').netmask())):
return True
return False
这种方法在测试中,时间减少了一半多。
class ChinaIp(object):
def __init__(self):
...
self.mask_list = []
...
def insert(self, cidr):
network = ipcalc.Network(cidr)
self.data[str(network.netmask())] = True
self.mask_set.add(network.mask)
def __contains__(self, ip):
for i in self.mask_set:
netmask = str(ipcalc.Network(f'{ip}/{i}').netmask())
if netmask in self.data:
return True
return False
这样优化后速度和第二种持平,不过实际应用中还需要根据 ip 列表的情况来判断需要用哪种。
_
1
westoy 2021-06-29 19:25:47 +08:00
netaddr 大法好
|
2
westoy 2021-06-29 19:26:00 +08:00
我靠, 又是我, 真是有缘.....
|
3
a719114136 OP @westoy 又是你在摸鱼
|
4
sujin190 2021-06-29 20:38:49 +08:00
|
5
aureole999 2021-06-29 22:57:07 +08:00
CIDR 的方法如果建一个 Trie 呢?
如果测试的 ip 不在 china_ip 里的话不管你随不随机 n 那肯定要查找 32 次,Trie 的话只要 ip 段不是极其零碎的话应该比较次数少点?我瞎猜的 |
6
RicardoY 2021-06-29 23:16:35 +08:00 via iPhone
为什么不用字典树…
|
7
fxrocks 2021-06-30 07:29:09 +08:00 via Android
字典不是直接 d[k]查询 v 么或者 d.get(k)?
没必要自己写个 2 分法吧 |
8
a719114136 OP @fxrocks 一共 3 亿个 ip
|
9
a719114136 OP |
10
a719114136 OP @sujin190 就是第三种方法,但实际测试中这样方法比二分慢很多
|
11
mywaiting 2021-06-30 08:51:53 +08:00
模仿一下路由器查找路由表的实现?
感觉这货应该有一个很成熟的算法实现 |
12
djj0809 2021-06-30 09:14:46 +08:00 via iPhone
@a719114136 建立二进制的 trie tree 啊,算法最差只需要比较 32 次就可以了
|
13
jinliming2 2021-06-30 09:20:12 +08:00
想到一个算法,和上面 3. 利用 CIDR 的特性 应该一样,没测试:
1,将 IP cidr 转换为 unsigned int 整数,比如 39.128.0.0/10 转为 662700032/10 2,根据 cidr 长度分别存到不同的 dict/list 中,比如:{ 10: [662700032, 796917760], 15: [1431699456] }(存 list 就排好序,便于后面去查找) 3,查询的时候 for cidr in cidr_list 遍历每一个 cidr 表,最多 32 个。假如上面这个 10 和 15 。 4,把要查询的 IP 转为 unsigned int 整数,根据 cidr 特性,使用按位与 & 运算,仅保留数字的前 cidr 位。 5,到对应的 dict/list 中查询这个数字。 |
14
missdeer 2021-06-30 09:57:57 +08:00
|
15
a719114136 OP @djj0809 比较 32 次比二分慢很多,方法 3 就是用类似的思路。
|
16
a719114136 OP @mywaiting 当时也考虑过,但没搜到。linux 自己倒是用了 ipset,但我看他里面貌似用的是 bitmap,hash 等结构。
另外路由器上的方案一般都要考虑通用性,也不会添加过多的规则。 对于这个场景 ip 是有规律的,因此有更好的方法。 |
17
aureole999 2021-06-30 10:17:13 +08:00
@a719114136 不啊,全换成 2 进制就可以啊,像 /10 的话 Trie 里只有 10 层,比较前 10 位就 ok 了。
|
18
aureole999 2021-06-30 10:24:39 +08:00
实际上都是大段大段的 ip,Trie 基本不会比较那么多次。但你的第三种方法在测试不在 china_ip 里的 ip 时一定要比较 32 次。
|
19
sujin190 2021-06-30 10:25:00 +08:00
@a719114136 #10 并不完全是,你这个第三种,如果这个 ip 不在里边并不能保证一次判断就能得出结果
ip 网段最重要特性就是掩码越短,网络范围越大,所以只要找出所有 ip 的全部可能掩码长度,然后在收集每个掩码长度下每个 ip 网段信息,判断的时候按掩码长度从小到大,如果需要判断的 ip 不在列表里,第一次就能得出结果,存在的也可以再优化到再判断一次就能得出结果 |
20
RicardoY 2021-06-30 10:27:17 +08:00 via iPhone
@a719114136 建 01 字典树…
|
21
dndx 2021-06-30 10:43:13 +08:00
trie 是正解,Linux 内核的路由表就是这么实现的,绝对错不了:
https://elixir.bootlin.com/linux/latest/source/net/ipv4/fib_trie.c |
22
shyrock 2021-06-30 10:47:45 +08:00
布隆过滤器?
|
23
nuk 2021-06-30 11:06:01 +08:00
前缀树啊,速度最快没有之一
|
24
no1xsyzy 2021-06-30 11:23:50 +08:00
@shyrock 布隆只能精确匹配,这里要前缀匹配
当然未必不可以把 /10 的 2^22 个地址全塞进去,但这显然空间效率感人。 不过也不排除某套 hash 算法组合可以极高效地解决这个问题,但找到这套 hash 算法组合的效率更感人。 |
25
lsylsy2 2021-06-30 11:30:38 +08:00
Tong Yang, Gaogang Xie, YanBiao Li, Qiaobin Fu, Alex X. Liu, Qi Li, and Laurent Mathy. 2014. Guarantee IP lookup performance with FIB explosion. In Proceedings of the 2014 ACM conference on SIGCOMM (SIGCOMM '14). Association for Computing Machinery, New York, NY, USA, 39–50. DOI:https://doi.org/10.1145/2619239.2626297
https://dl.acm.org/doi/pdf/10.1145/2619239.2626297 大学组里的老师做的,CPU 实现自己做一个效果很不错 |
27
Goooogle 2021-06-30 12:46:22 +08:00
刚好做过一样的,说一下实现逻辑
- 国内 IP 可以认为是多个 IP 段组成,转成类似于 1.1.1.1 -> 1.2.3.4 的结构,相邻的 IP 段可以合并 - IP 转成 unsigned int,那么一个 IP 段可以表示为类似于[1024, 8192]的数据 - 以 1024 为 Key,8192 为 Value,存放到 TreeMap 中 - 在查询时,将传入的 IP 也转成 unsigned int,然后去 TreeMap 查询小于等于 IP 的 Key,然后再判断 Value 是否大于等于 IP 即可 时间复杂度 O(logn),空间的话 O(ip 段数) |
28
imn1 2021-06-30 14:23:40 +08:00
如果已经有 start/end 的分段数据,bisect 好像代码更简单,不用写那么长
|
29
dqzcwxb 2021-06-30 16:08:27 +08:00
trie 树,也就是 DFA 算法可解决大数据量关键字匹配问题
|
30
wellsc 2021-06-30 16:09:48 +08:00
又到了喜闻乐见的无脑推布隆过滤器时间了
|
31
shuianfendi6 2021-06-30 16:31:25 +08:00
类似 ls 做法
1. TreeMap<Comparable>,Comparable 包含起始 ip 2. 直接用 TreeRangeMap |
32
a719114136 OP @aureole999 那怎么知道哪些 ip 需要比较前 10 位,哪些比钱前 8 位呢
|
33
a719114136 OP |
34
shyrock 2021-06-30 17:51:51 +08:00
@a719114136 #33 布隆确实有误判率,但是误判率可以通过调整位数和哈希函数的数量来降低。就看这个应用到底允许多高的误判率了。
|
35
aureole999 2021-06-30 17:53:08 +08:00
@a719114136 建树的时候这个地址段是 /10 的,那到第 10 层这个叶子结点的时候下面就没有了,自然只比较 10 次就可以判断了。如果你的 ip list 里所有地址都是 /32 的,那在判断属于 china_ip 的 ip 的时候,你就要比较 32 次了,但不属于 china_ip 的地方还是大段连续的,可能比较几次就能出结果了。
|