haproxy

分類 负载均衡, haproxy

简介:HAProxy是一个免费的负载均衡软件,可以运行于大部分主流的Linux操作系统上。HAProxy提供了**L4(TCP)和L7(HTTP)**两种负载均衡能力,具备丰富的功能。HAProxy的社区非常活跃,版本更新快速,HAProxy具备媲美商用负载均衡器的性能和稳定性。因为HAProxy的上述优点,它当前不仅仅是免费负载均衡软件的首选,更几乎成为了唯一选择。

下面我将从3个方面来讲解从而帮助大家快速上手玩转haproxy

1.了解haproxy

1.1.环境配置

主机名IP服务
client172.25.254.10客户端
haproxy

172.25.254.100

192.168.132.100

haproxy服务端
server1192.168.132.101web服务器 httpd
server2192.168.132.102web服务器 nginx

1.2 了解haproxy配置文件

1
2
3
4
yum install haproxy
cd /etc/haproxy
#查看我们的配置文件
vim haproxy.cfg

1.2.1 global 全局配置

除去上面我们还可以有以下几种

nbproc N开启的haproxy worker 进程数,默认为1个
nbthread 1指定每个haproxy进程开启的线程数,默认一个进程一个线程
cpu-map A B绑定进程指定cpu,将A进程绑定到Bcpu上
log IP local2 info 定义全局的syslog服务器但日志服务器需要开启udp协议最多定义2个
maxss|conn N每个进程ssl最大连接数,用于hproxy配置了证书的场景下
maxconnrate N每个进程每秒创建的最大连接数量
spread-checks N后端server状态check随机提前或延迟百分比时间,建议2-5(20%-50%)之间,默认值0
pidfile指定pid文件路径

1.2.2 默认配置

所有“listen”和“backend”部分(如果在它们的块中没有指定)都将使用的常见默认值

1.2.3 配置文件区

1
2
pstree -p | grep haproxy
#查看我们的进程前提是服务开启

2. 基础配置

2.1 简单来一段基础配置

check 开机检测异常情况

inter    健康状态检查间隔时间,默认为2s

fall        后端服务器从线上转为线下的连续监测失败次数,默认为3次

rise        后端服务器从线下恢复上线的连续监测有效次数,默认为2次

weight       权重,权重越大使用越多

backup  后端服务器标记为备份状态,只有主非备份主机死亡时启用

disable  将后端服务器标记为不可用

redirect prefix/redir  将请求临时302重定向到其他URL只适用于http服务

2.2 采用listen"合体"写法

2.3 来尝试一下标红功能

首先我们在server1上下载一个nginx并写入内容外加改端口号避免和我们的httpd冲突

vim /etc/nginx/nginx.conf

由于web1 和web2都被打上了不可用标签所以当我们访问就会被带到百度

这个时候我们就会访问到我们101上的8080端口

3 算法

很多时候我们要处理的情况很复杂单靠上面的配置我们很难完成我们想要的效果,于是乎就出现了各种各样的算法

3.1 静态算法

静态算法:按照事先定义好的规则轮询公平调度,不关心后端服务器的当前负载、连接数和响应速度等,且无法实时修改权重(只能为0和1,不支持其它值),只能靠重启HAProxy生效。

3.1.1 static-rr 基于权重的轮询调度

不支持运行时利用socat进行权重的动态调整(只支持0和1,不支持其它值)
不支持端服务器慢启动
其后端主机数量没有限制,相当于LVS中的 wrr

3.1.2 first

根据服务器在列表中的位置,自上而下进行调度
其只会当第一台服务器的连接数达到上限,新请求才会分配给下一台服务
其会忽略服务器的权重设置
不支持用socat进行动态修改权重,可以设置0和1,可以设置其它值但无效

3.2 动态算法

基于后端服务器状态进行调度适当调整,
新请求将优先调度至当前负载较低的服务器
权重可以在haproxy运行时动态调整无需重启

3.2.1 roundrobin

1.基于权重的轮询动态调度算法
2.支持权重的运行时调整,不同于Ivs中的rr轮训模式

3.HAProxy中的roundrobin支持慢启动(新加的服务器会逐渐增加转发数)

4.其每个后端backend中最多支持4095个real server,
5.支持对real server权重动态调整,
6.roundrobin为默认调度算法,此算法使用广泛

会被权重影响可以理解为static-rr的升级版

3.2.2 leastconn

leastconn加权的最少连接的动态
支持权重的运行时调整和慢启动,即:根据当前连接最少的后端服务器而非权重进行优先调度(新客
户端连接)
比较适合长连接的场景使用,比如:MySQL等场景:

balance leastconn

3.3 其他算法

3.3.1 source

源地址hash,基于用户源地址hash并将请求转发到后端服务器,后续同一个源地址请求将被转发至同-个后端web服务器。此方式当后端服务器数据量发生变化时,会导致很多用户的请求转发至新的后端服务器,默认为静态方式,但是可以通过hash-type支持的选项更改这个算法一般是在不插入Cookie的TCP模式下使用,也可给拒绝会话cookie的客户提供最好的会话粘性,适用于session会话保持但不支持cookie和缓存的场景源地址有两种转发客户端请求到后端服务器的服务器选取计算方式,分别是取模法和一致性hash

3.3.1.1 map-base 取模法

map-based:取模法,对source地址进行hash计算,再基于服务器总权重的取模,最终结果决定将此请求转发至对应的后端服务器。
此方法是静态的,即不支持在线调整权重,不支持慢启动,可实现对后端服务器均衡调度缺点是当服务器的总权重发生变化时,即有服务器上线或下线,都会因总权重发生变化而导致调度结果整体改变,hash-type 指定的默认值为此算法

balance source

3.3.1.2 一致性hash

一致性哈希,当服务器的总权重发生变化时,对调度结果影响是局部的,不会引起大的变动
该hash算法是动态的,支持使用 socat等工具进行在线权重调整,支持慢启动

balance source

hash-type consistent

3.3.2 uri

基于对用户请求的URI的左半部分或整个uri做hash,再将hash结果对总权重进行取模后
根据最终结果将请求转发到后端指定服务器
适用于后端是缓存服务器场景
默认是静态算法,也可以通过hash-type指定map-based和consistent,来定义使用取模法还是一致性hash
注意:此算法基于应用层,所以只支持 mode http,不支持 mode tcp

uri就类似一个独一无二的身份标识

url就是每次我们输入网址访问某个网站时,浏览器上输入的那一行内容。比如:http://baidu.com这是一个url,每个链接地址是一个url。

  • URI = Uniform Resource Identifier 统一资源标志符
  • URL = Uniform Resource Locator 统一资源定位符

具体关于url uri的关系本文不做详细解答

balance uri

3.3.3 url_param

url_param对用户请求的url中的 params 部分中的一个参数key对应的value值作hash计算,并由服务器总权重相除以后派发至某挑出的服务器,后端搜索同一个数据会被调度到同一个服务器,多用于电商通常用于追踪用户,以确保来自同一个用户的请求始终发往同一个realserver如果无没key,将按roundrobin算法

balance uri

hash-type consistent

后续访问不在变换内容

3.3.4 hdr

3.3.4.1 hdr取模法

针对用户每个http头部(header)请求中的指定信息做hash,此处由 name 指定的http首部将会被取出并做hash计算,然后由服务器总权重取模以后派发至某挑出的服务器,如果无有效值,则会使用默认的轮询调度。

balance hdr(User-Agent)

3.3.4.2 hdr一致性hash

balance hdr(User-Agent)

hash-type consistent

curl -v

curl -vA s手动更改user-agent

4  高级功能及配置

4.1 基于cookie的会话保持

cookie value:为当前server指定cookie值,实现基于cookie的会话黏性,相对于基于source 地址hash 调度算法对客户端的粒度更精准,但同时也加大了haproxy负载,目前此模式使用较少,已经被session共享服务器代替
注意:不支持 tcp mode,使用 http mode

cookie namerewrite insertprefixindirect i[ nocache i postonly]preserve ][httponly ][secure ][ domain ]*[ maxidle ][ maxlife ]
name:#cookie的key名称,用于实现持久连接

insert:#插入新的cookie,默认不插入cookie
indirect:#如果客户端已经有cookie,则不会再发送cookie信息

nocache:#当client和hapoxy之间有缓存服务器(如:CDN)时,不允许中间缓存器缓存cookie,#因为这会导致很多经过同一个CDN的请求都发送到同一台后端服务器

4.2 haproxy状态页

通过web界面,显示当前HAProxy的运行状态

stats enable#基于默认的参数启用stats page
stats hide-version#将状态页中haproxy版本隐藏
stats refresh #设定自动刷新时间间隔,默认不自动刷新
stats uri #自定义stats page uri,默认值:/haproxy?statsstats auth :#认证时的账号和密码,可定义多个用户,每行指定一个用户

#默认:no authentication
stats admin{ifunless } #启用stats page中的管理功能

4.3 ip 透传

web服务器中需要记录客户端的真实IP地址,用于做访问统计、安全防护、行为分析、区域排行等场景。

4.3.1 layer7

七层:协议+内容交换

七层负载均衡服务器起了一个反向代理服务器的作用,服务器建立一次TCP连接要三次握手,而client要访问Web Server要先与七层负载设备进行三次握手后建立TCP连接,把要访问的报文信息发送给七层负载均衡;然后七层负载均衡再根据设置的均衡规则选择特定的 Web Server,然后通过三次握手与此台Web Server建立TCP连接,然后Web Server把需要的数据发送给七层负载均衡设备,负载均衡设备再把数据发送给client;所以,七层负载均衡设备起到了代理服务器的作用,七层代理需要和Client和后端服务器分别建立连接

option forwardfor [except ][ header ][ if-none ]
[except]:请求报请来自此处指定的网络时不予添加此首部,如haproxy自身所在网络
[header ]:使用自定义的首部名称,而非“x-Forwarded-For",示例:X-clien
[if-none ]如果没有首部才添加首部,如果有使用默认值

正常情况下的日志

在由haproxy发往后端主机的请求报文中添加"X-Forwarded-For"首部,其值为前端客户端的地址;用于向后端主发送真实的客户端IP

4.3.2 layer4

四层:IP+PORT转发
在四层负载设备中,把client发送的报文目标地址(原来是负载均衡设备的IP地址),根据均衡设备设置的选择web服务器的规则选择对应的web服务器IP地址,这样client就可以直接跟此服务器建立TCP连接并发送数据,而四层负载自身不参与建立连接,而和LVS不同,haproxy是伪四层负载均衡,因为haproxy需要分别和前端客户端及后端服务器建立连接

4.4 ACL

访问控制列表ACL,Access ControlLists)
是一种基于包过滤的访问控制技术
它可以根据设定的条件对经过服务器传输的数据包进行过滤(条件匹配)即对接收到的报文进行匹配和过滤,基于请求报文头部中的源地址、源端口、目标地址、目标端口、请求方法、URL、文件后缀等信息内容进行匹配并执行进一步操作,比如允许其通过或丢弃。

4.4.1 acl的选项配置

acl             [flags]        [operator]        []

acl       名称          匹配规范         匹配模式     具体操作符    操作对象类型

#ACL名称,可以使用大字母A-Z、小写字母a-z、数字0-9、冒号:、点.、中横线和下划线,并且严格区分大小写

#定义ACL匹配规范,即:判断条件

hdr string,提取在一个HTTP请求报文的首部

hdr([[,]]):完全匹配字符串,header的指定信息,表示在多值中使用的值的出现次数
hdr_beg([[,]]):前缀匹配,header中指定匹配内容的begin

hdr_end([[,]]):后缀匹配,header中指定匹配内容end

hdr_dom([[,]]):域匹配,header中的domainname(host)

hdr_dir([[,<0CC>]]):路径匹配,header的uri路径
hdr_len([[,]]):长度匹配,header的长度匹配
hdr_reg([[,]]):正则表达式匹配,自定义表达式(regex)模糊匹配
hdr_sub([[,]]):子串匹配,header中的uri模糊匹配 模糊匹配c 洪湖报文中a/b/c
也会匹配

base 同理

base :string
#返回第一个主机头和请求的路径部分的连接,该请求从主机名开始,并在问号之前结束,对虚拟主机有用://:@#:/;#?#

path 同理

path :string
#提取请求的URL路径,该路径从第一个斜杠开始,并在问号之前结束(无主机部分)
://:@:#/;#?#

url 同理

url :string
#提取请求中的整个URL。一个典型的应用是具有预取能力的缓存,以及需要从数据库聚合多个信息并将它们保存在缓存中的网页门户入口,推荐使用path

#ACL匹配模式
-i不区分大小写
-m 使用指定的正则表达式匹配方法
-n 不做DNS解析
-u禁止ac1重名,否则多个同名ACL匹配或关系

ACL 操作符
整数比较:eq、ge、gt、1e、1t
字符比较:
-exact match  (-m str):字符串必须完全匹配模式
-substring match(m sub):在提取的字符串中查找模式,如果其中任何一个被发现,ACL将匹配

-prefix match   (-m beg):在提取的字符串首部中查找模式,如果其中任何一个被发现,ACL将匹配

-suffix match    (-mend):将模式与提取字符串的尾部进行比较,如果其中任何一个匹配,则ACL进行匹配
-subdir match   (-m dir):查看提取出来的用斜线分隔(“/“)的字符串,如其中任一个匹配,则ACL进行匹配
domain match  (-m dom):查找提取的用点(“.”)分隔字符串,如果其中任何一个匹配,则ACL进-行匹配

value的类型
The ACL engine can match these types against patterns of the following types :
-Boolean #布尔值
-integer or integer range #整数或整数范围,比如用于匹配端口范围
-IP address/network#IP地址或IP范围,192.168.0.1,192.168.0.1/24

-string–>www.timinglee.org
   exact#精确比较
   substring#子串
   suffix#后缀比较
   prefix#前缀比较
   subdir#路径,/wp-includes/js/jquery/jquery.js
   domain#域名,www.timinglee.org
-regular expression#正则表达式
-hex block#16进制

多个ACL的逻辑处理
与:隐式(默认)使用
或:使用“or”或“||“表示
否定:使用”!”表示

if

if !

4.4.2 acl 策略

acl 基于域名匹配

基于源Ip或子网调度访问

基于源地址访问控制

基于浏览器类型


基于文件后缀名实现动态分离

基于访问路径实现动静分离

总结


留言與分享

旅行商问题

分類 算法

什么是旅行商问题(TSP)?

旅行商问题 (TSP) 是理论计算机科学中的经典组合问题。该问题要求找到图中的最短路径,条件是仅访问所有节点一次并返回出发城市。

问题陈述给出了城市列表以及每个城市之间的距离。

目的: 从出发城市出发,只访问一次其他城市,然后返回出发城市。我们的目标是找到完成往返路线的最短路径。

表中的内容:

TSP 示例

这里给出了一个图表,其中 1、2、3 和 4 代表城市,与每条边相关的权重代表这些城市之间的距离。

TSP 示例

目标是找到从出发城市出发、遍历图表且仅访问其他城市或节点一次并返回出发城市的最短路径。

对于上图,最佳路线是遵循成本最低的路径: 1-2-4-3-1. 这条最短路线的费用为 10+25+30+15 =80

旅行商问题的不同解决方案

Different Solutions to Travelling Salesman Problem

旅行商问题 (TSP) 被归类为 NP 难题,因为没有多项式时间算法。随着城市数量的增加,复杂性呈指数级增长。

有多种方法可以解决旅行商问题 (tsp)。一些流行的解决方案是:

蛮力法是一种幼稚的方法 解决旅行商问题的方法。在这种方法中,我们首先计算所有可能的路径,然后进行比较。由 n 个城市组成的图中的路径数为 n! 使用这种蛮力方法解决旅行商问题的计算成本非常高。

分支限界法: 这种方法将问题分解为子问题。解决这些单独的子问题将提供最佳解决方案。

本教程将演示一种动态规划方法,即分支定界法的递归版本,用于解决旅行商问题。

动态编程 是通过分析所有可能的路线来寻求最优解的一种方法。它是解决旅行商问题的精确解决方法之一,它比 贪婪方法 提供近乎最优的解决方案。

该方法的计算复杂度为 O(N^2 * 2^N) 本文后面会讨论这一点。

最近邻法 是一种基于启发式的贪婪方法,我们选择最近的邻居节点。这种方法在计算上比动态方法更便宜。但它不能保证最优解。此方法用于近似最优解。

旅行商问题的算法

我们将使用动态规划方法来解决旅行商问题(TSP)。

在开始算法之前,让我们先熟悉一些术语:

  • 图G=(V,E),它是一组顶点和边的集合。
  • V 是顶点集。
  • E 是边集。
  • 顶点通过边连接。
  • Dist(i,j)表示两个顶点 i 和 j 之间的非负距离。

假设 S 是城市子集,属于 {1, 2, 3, …, n},其中 1、2、3…n 是城市,i、j 是该子集中的两个城市。现在 cost(i, S, j) 定义为访问 S 中节点的最短路径的长度,该路径恰好一次以 i 和 j 为起点和终点。

例如,cost (1, {2, 3, 4}, 1) 表示最短路径的长度,其中:

  • 出发城市为 1
  • 城市 2、3 和 4 仅访问过一次
  • 终点是 1

动态规划算法如下:

  • 设定 cost(i, , i) = 0,这意味着我们从 i 开始和结束,并且成本为 0。
  • 当 |S| > 1 时,我们定义 cost(i, S, 1) = ∝ 其中 i !=1 。因为最初我们不知道从城市 i 经过其他城市到达城市 1 的确切成本。
  • 现在,我们需要从 1 开始并完成游览。我们需要以这种方式选择下一个城市-

cost(i, S, j)=min cost (i, S−{i}, j)+dist(i,j),其中 i∈S 且 i≠j

对于给定的图,邻接矩阵如下:

Algorithm for Traveling Salesman Problem

距离(i,j)1234
10101520
21003525
31535030
42025300

让我们看看我们的算法是如何工作的:

步骤1) 我们考虑从城市 1 开始我们的旅程,游览其他城市一次,然后返回城市 1。

步骤2) S 是城市的子集。根据我们的算法,对于所有 |S| > 1,我们将设置距离 cost(i, S, 1) = ∝。这里 cost(i, S, j) 表示我们从城市 i 出发,访问 S 的城市一次,现在我们到达城市 j。我们将此路径成本设置为无穷大,因为我们还不知道距离。因此值将如下所示:

Cost (2, {3, 4}, 1) = ∝ ;符号表示我们从城市 2 出发,经过城市 3、4,到达城市 1。路径成本为无穷大。类似地,

成本(3,{2,4},1)=∝

成本(4,{2,3},1)=∝

步骤3) 现在,对于 S 的所有子集,我们需要找到以下内容:

cost(i, S, j)=min cost (i, S−{i}, j)+dist(i,j),其中 j∈S 且 i≠j

这意味着从 i 出发,经过城市子集一次,然后返回城市 j 的最低成本路径。考虑到旅程从城市 1 开始,最佳路径成本将为 = cost(1, {其他城市}, 1)。

让我们看看如何实现这个目标:

现在 S = {1, 2, 3, 4}。有四个元素。因此子集的数量将为 2^4 或 16。这些子集是: **1)|S| = 空:** {φ} **2)|S| = 1:** {{1}, {2}, {3}, {4}} **3)|S| = 2:** {{1, 2}, {1, 3}, {1, 4}, {2, 3}, {2, 4}, {3, 4}} **4)|S| = 3:** {{1, 2, 3}, {1, 2, 4}, {2, 3, 4}, {1, 3, 4}} **5)|S| = 4:** {{1, 2, 3, 4}}

由于我们从 1 开始,因此我们可以丢弃包含城市 1 的子集。

算法计算:

1)|S| = Φ:

成本(2,Φ,1)=距离(2,1)= 10

成本(3,Φ,1)=距离(3,1)= 15

成本(4,Φ,1)=距离(4,1)= 20

2)|S| = 1:

成本(2,{3},1)= 距离(2,3)+ 成本(3,Φ,1)= 35+15 = 50

成本(2,{4},1)= 距离(2,4)+ 成本(4,Φ,1)= 25+20 = 45

成本(3,{2},1)= 距离(3,2)+ 成本(2,Φ,1)= 35+10 = 45

成本(3,{4},1)= 距离(3,4)+ 成本(4,Φ,1)= 30+20 = 50

成本(4,{2},1)= 距离(4,2)+ 成本(2,Φ,1)= 25+10 = 35

成本(4,{3},1)= 距离(4,3)+ 成本(3,Φ,1)= 30+15 = 45

3)|S| = 2:

成本(2,{3,4},1)=最小值[dist[2,3]+成本(3,{4},1)=35+50=85,

dist[2,4]+Cost(4,{3},1) = 25+45 = 70 ] = 70

成本(3,{2,4},1)=最小值[dist[3,2]+成本(2,{4},1)=35+45=80,

dist[3,4]+Cost(4,{2},1) = 30+35 = 65 ] = 65

成本(4,{2,3},1)=最小值[dist[4,2]+成本(2,{3},1)= 25+50 = 75

dist[4,3]+Cost(3,{2},1) = 30+45 = 75 ] = 75

4)|S| = 3:

成本(1,{2,3,4},1)=最小值[dist[1,2]+成本(2,{3,4},1)=10+70=80

dist[1,3]+Cost(3,{2,4},1) = 15+65 = 80

dist[1,4]+Cost(4,{2,3},1) = 20+75 = 95 ] = 80

所以最佳解决方案是 1-2-4-3-1

Algorithm for Traveling Salesman Problem

伪代码

1
2
3
4
5
6
7
8
Algorithm: Traveling-Salesman-Problem 
Cost (1, {}, 1) = 0
for s = 2 to n do
for all subsets S belongs to {1, 2, 3, … , n} of size s
Cost (s, S, 1) = Infinity
for all i Є S and i ≠ 1
Cost (i, S, j) = min {Cost (i, S – {i}, j) + dist(i, j) for j Є S and i ≠ j}
Return min(i) Cost (i, {1, 2, 3, …, n}, j) + d(j, i)

在 C/ 中实现C++

以下是实现 C++:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <bits/stdc++.h>
using namespace std;
#define V 4
#define MAX 1000000
int tsp(int graph[][V], int s)
{
vector<int> vertex;
for (int i = 0; i < V; i++)
if (i != s)
vertex.push_back(i);
int min_cost = MAX;
while(next_permutation(vertex.begin(), vertex.end()))
{
int current_cost = 0;
int j = s;
for (int i = 0; i < vertex.size(); i++) {
current_cost += graph[j][vertex[i]];
j = vertex[i];
}
current_cost += graph[j][s];
min_cost = min(min_cost, current_cost);
return min_cost;
}
}
int main()
{
int graph[][V] = { { 0, 10, 15, 20 },{ 10, 0, 35, 25 },{ 15, 35, 0, 30 },{ 20, 25, 30, 0 } };
int s = 0;
cout << tsp(graph, s) << endl;
return 0;
}

输出:

1
80

在实施 Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from sys import maxsize
from itertools, import permutations
V = 4
def tsp(graph, s):
vertex = []
for i in range(V):
if i != s:
vertex.append(i)
min_cost = maxsize
next_permutation=permutations(vertex)
for i in next_permutation:
current_cost = 0
k = s
for j in i:
current_cost += graph[k][j]
k = j
current_cost += graph[k][s]
min_cost = min(min_cost, current_cost)
return min_cost
graph = [[0, 10, 15, 20], [10, 0, 35, 25], [15, 35, 0, 30], [20, 25, 30, 0]]
s = 0
print(tsp(graph, s))

输出:

1
80

TSP 的学术解决方案

计算机科学家花了数年时间寻找一种改进的多项式时间算法来解决旅行商问题。到目前为止,这个问题仍然是 NP 难题。

尽管近年来已经发表了以下一些解决方案,在一定程度上降低了复杂性:

  • 经典的对称TSP问题采用零后缀法求解。
  • 基于生物地理学的优化算法是基于迁移策略来解决可规划为TSP的优化问题。
  • 多目标进化算法是基于NSGA-II设计用于解决多TSP问题的。
  • 多智能体系统用固定的资源解决N个城市的TSP问题。

旅行商问题的应用

旅行商问题 (TSP) 以其最纯粹的形式和修改形式应用于现实世界。其中一些是:

  • 规划、物流和制造微芯片:芯片插入问题自然出现在微芯片行业中。这些问题可以规划为旅行商问题。
  • DNA测序:旅行商问题的轻微修改可用于 DNA 测序。这里,城市代表 DNA 片段,距离代表两个 DNA 片段之间的相似性度量。
  • 天文学:天文学家利用旅行商问题是尽量减少观察各种来源所花费的时间。
  • 最优控制问题:旅行商问题公式可应用于最优控制问题。可能还会添加其他一些约束。

TSP 的复杂性分析

  • 时间复杂度: 共有2个N 每个节点有 子问题。因此子问题总数为 N*2N。每个子问题都需要线性时间来解决。如果未指定原节点,我们必须对 N 个节点运行循环。

    因此,最佳解决方案的总时间复杂度将是节点数 * 子问题数 * 解决每个子问题的时间。时间复杂度可以定义为 O(N2 *2^N)。

  • 空间复杂度: 动态规划方法使用内存来存储 C(S, i),其中 S 是顶点集的子集。总共有 2N 每个节点有 2 个子集。因此空间复杂度为 O(XNUMX^N)。

接下来,您将了解 埃拉托斯特尼筛法

留言與分享

路径规划之 A* 算法

分類 算法

A*(念做:A Star)算法是一种很常用的路径查找和图形遍历算法。它有较好的性能和准确度。本文在讲解算法的同时也会提供Python语言的代码实现,并会借助matplotlib库动态的展示算法的运算过程。

算法介绍

A*算法最初发表于1968年,由Stanford研究院的Peter Hart, Nils Nilsson以及Bertram Raphael发表。它可以被认为是Dijkstra算法的扩展。

由于借助启发函数的引导,A*算法通常拥有更好的性能。

广度优先搜索

为了更好的理解A*算法,我们首先从广度优先(Breadth First)算法讲起。

正如其名称所示,广度优先搜索以广度做为优先级进行搜索。

从起点开始,首先遍历起点周围邻近的点,然后再遍历已经遍历过的点邻近的点,逐步的向外扩散,直到找到终点。

这种算法就像洪水(Flood fill)一样向外扩张,算法的过程如下图所示:

在上面这幅动态图中,算法遍历了图中所有的点,这通常没有必要。对于有明确终点的问题来说,一旦到达终点便可以提前终止算法,下面这幅图对比了这种情况:

在执行算法的过程中,每个点需要记录达到该点的前一个点的位置 – 可以称之为父节点。这样做之后,一旦到达终点,便可以从终点开始,反过来顺着父节点的顺序找到起点,由此就构成了一条路径。

Dijkstra算法

Dijkstra算法是由计算机科学家Edsger W. Dijkstra在1956年提出的。

Dijkstra算法用来寻找图形中节点之间的最短路径。

考虑这样一种场景,在一些情况下,图形中相邻节点之间的移动代价并不相等。例如,游戏中的一幅图,既有平地也有山脉,那么游戏中的角色在平地和山脉中移动的速度通常是不相等的。

在Dijkstra算法中,需要计算每一个节点距离起点的总移动代价。同时,还需要一个优先队列结构。对于所有待遍历的节点,放入优先队列中会按照代价进行排序。

在算法运行的过程中,每次都从优先队列中选出代价最小的作为下一个遍历的节点。直到到达终点为止。

下面对比了不考虑节点移动代价差异的广度优先搜索与考虑移动代价的Dijkstra算法的运算结果:

当图形为网格图,并且每个节点之间的移动代价是相等的,那么Dijkstra算法将和广度优先算法变得一样。

最佳优先搜索

在一些情况下,如果我们可以预先计算出每个节点到终点的距离,则我们可以利用这个信息更快的到达终点。

其原理也很简单。与Dijkstra算法类似,我们也使用一个优先队列,但此时以每个节点到达终点的距离作为优先级,每次始终选取到终点移动代价最小(离终点最近)的节点作为下一个遍历的节点。这种算法称之为最佳优先(Best First)算法。

这样做可以大大加快路径的搜索速度,如下图所示:

但这种算法会不会有什么缺点呢?答案是肯定的。

因为,如果起点和终点之间存在障碍物,则最佳优先算法找到的很可能不是最短路径,下图描述了这种情况。

A*算法

对比了上面几种算法,最后终于可以讲解本文的重点:A*算法了。

下面的描述我们将看到,A*算法实际上是综合上面这些算法的特点于一身的。

A*算法通过下面这个函数来计算每个节点的优先级。

\[f(n) = g(n) + h(n)\]

其中:

  • f(n)f(n) 是节点n的综合优先级。当我们选择下一个要遍历的节点时,我们总会选取综合优先级最高(值最小)的节点。
  • g(n)g(n) 是节点n距离起点的代价。
  • h(n)h(n) 是节点n距离终点的预计代价,这也就是A*算法的启发函数。关于启发函数我们在下面详细讲解。

A*算法在运算过程中,每次从优先队列中选取f(n)f(n)值最小(优先级最高)的节点作为下一个待遍历的节点。

另外,A*算法使用两个集合来表示待遍历的节点,与已经遍历过的节点,这通常称之为open_setclose_set

完整的A*算法描述如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
* 初始化open_set和close_set;
* 将起点加入open_set中,并设置优先级为0(优先级最高);
* 如果open_set不为空,则从open_set中选取优先级最高的节点n:
* 如果节点n为终点,则:
* 从终点开始逐步追踪parent节点,一直达到起点;
* 返回找到的结果路径,算法结束;
* 如果节点n不是终点,则:
* 将节点n从open_set中删除,并加入close_set中;
* 遍历节点n所有的邻近节点:
* 如果邻近节点m在close_set中,则:
* 跳过,选取下一个邻近节点
* 如果邻近节点m也不在open_set中,则:
* 设置节点m的parent为节点n
* 计算节点m的优先级
* 将节点m加入open_set中

启发函数

上面已经提到,启发函数会影响A*算法的行为。

  • 在极端情况下,当启发函数h(n)h(n)始终为0,则将由g(n)g(n)决定节点的优先级,此时算法就退化成了Dijkstra算法。
  • 如果h(n)h(n)始终小于等于节点n到终点的代价,则A*算法保证一定能够找到最短路径。但是当h(n)h(n)的值越小,算法将遍历越多的节点,也就导致算法越慢。
  • 如果h(n)h(n)完全等于节点n到终点的代价,则A*算法将找到最佳路径,并且速度很快。可惜的是,并非所有场景下都能做到这一点。因为在没有达到终点之前,我们很难确切算出距离终点还有多远。
  • 如果h(n)h(n)的值比节点n到终点的代价要大,则A*算法不能保证找到最短路径,不过此时会很快。
  • 在另外一个极端情况下,如果h(n)h(n)相较于g(n)g(n)大很多,则此时只有h(n)h(n)产生效果,这也就变成了最佳优先搜索。

由上面这些信息我们可以知道,通过调节启发函数我们可以控制算法的速度和精确度。因为在一些情况,我们可能未必需要最短路径,而是希望能够尽快找到一个路径即可。这也是A*算法比较灵活的地方。

对于网格形式的图,有以下这些启发函数可以使用:

  • 如果图形中只允许朝上下左右四个方向移动,则可以使用曼哈顿距离(Manhattan distance)。
  • 如果图形中允许朝八个方向移动,则可以使用对角距离。
  • 如果图形中允许朝任何方向移动,则可以使用欧几里得距离(Euclidean distance)。

关于距离

曼哈顿距离

如果图形中只允许朝上下左右四个方向移动,则启发函数可以使用曼哈顿距离,它的计算方法如下图所示:

计算曼哈顿距离的函数如下,这里的D是指两个相邻节点之间的移动代价,通常是一个固定的常数。

1
2
3
4
function heuristic(node) =
dx = abs(node.x - goal.x)
dy = abs(node.y - goal.y)
return D * (dx + dy)

对角距离

如果图形中允许斜着朝邻近的节点移动,则启发函数可以使用对角距离。它的计算方法如下:

计算对角距离的函数如下,这里的D2指的是两个斜着相邻节点之间的移动代价。如果所有节点都正方形,则其值就是\\sqrt{2} \* D。

1
2
3
4
function heuristic(node) =
dx = abs(node.x - goal.x)
dy = abs(node.y - goal.y)
return D * (dx + dy) + (D2 - 2 * D) * min(dx, dy)

欧几里得距离

如果图形中允许朝任意方向移动,则可以使用欧几里得距离。

欧几里得距离是指两个节点之间的直线距离,因此其计算方法也是我们比较熟悉的:\\sqrt{(p2.x-p1.x)^2 + (p2.y-p1.y)^2}。其函数表示如下:

1
2
3
4
function heuristic(node) =
dx = abs(node.x - goal.x)
dy = abs(node.y - goal.y)
return D * sqrt(dx * dx + dy * dy)

算法实现

虽然前面介绍了很多内容,但实际上A*算法并不复杂,实现起来也比较简单。

下面我们给出一个Python语言的代码示例。

之所以使用Python语言是因为我们可以借助matplotlib库很方便的将结果展示出来。在理解了算法之后,通过其他语言实现也并非难事。

我们的算法演示的是在一个二维的网格图形上从起点找寻终点的求解过程。

坐标点与地图

首先,我们创建一个非常简单的类来描述图中的点,相关代码如下:

1
2
3
4
5
6
7
8
9
# point.py

import sys

class Point:
def __init__(self, x, y):
self.x = x
self.y = y
self.cost = sys.maxsize

接着,我们实现一个描述地图结构的类。为了简化算法的描述:

我们选定左下角坐标[0, 0]的点是算法起点,右上角坐标[size - 1, size - 1]的点为要找的终点。

为了让算法更有趣,我们在地图的中间设置了一个障碍,并且地图中还会包含一些随机的障碍。该类的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# random_map.py

import numpy as np

import point

class RandomMap:
def __init__(self, size=50): ①
self.size = size
self.obstacle = size//8 ②
self.GenerateObstacle() ③

def GenerateObstacle(self):
self.obstacle_point = []
self.obstacle_point.append(point.Point(self.size//2, self.size//2))
self.obstacle_point.append(point.Point(self.size//2, self.size//2-1))

# Generate an obstacle in the middle
for i in range(self.size//2-4, self.size//2): ④
self.obstacle_point.append(point.Point(i, self.size-i))
self.obstacle_point.append(point.Point(i, self.size-i-1))
self.obstacle_point.append(point.Point(self.size-i, i))
self.obstacle_point.append(point.Point(self.size-i, i-1))

for i in range(self.obstacle-1): ⑤
x = np.random.randint(0, self.size)
y = np.random.randint(0, self.size)
self.obstacle_point.append(point.Point(x, y))

if (np.random.rand() > 0.5): # Random boolean ⑥
for l in range(self.size//4):
self.obstacle_point.append(point.Point(x, y+l))
pass
else:
for l in range(self.size//4):
self.obstacle_point.append(point.Point(x+l, y))
pass

def IsObstacle(self, i ,j): ⑦
for p in self.obstacle_point:
if i==p.x and j==p.y:
return True
return False

这段代码说明如下:

  1. 构造函数,地图的默认大小是50x50;
  2. 设置障碍物的数量为地图大小除以8;
  3. 调用GenerateObstacle生成随机障碍物;
  4. 在地图的中间生成一个斜着的障碍物;
  5. 随机生成其他几个障碍物;
  6. 障碍物的方向也是随机的;
  7. 定义一个方法来判断某个节点是否是障碍物;

算法主体

有了基本的数据结构之后,我们就可以开始实现算法主体了。

这里我们通过一个类来封装我们的算法。

首先实现一些算法需要的基本函数,它们如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# a_star.py

import sys
import time

import numpy as np

from matplotlib.patches import Rectangle

import point
import random_map

class AStar:
def __init__(self, map):
self.map=map
self.open_set = []
self.close_set = []

def BaseCost(self, p):
x_dis = p.x
y_dis = p.y
# Distance to start point
return x_dis + y_dis + (np.sqrt(2) - 2) * min(x_dis, y_dis)

def HeuristicCost(self, p):
x_dis = self.map.size - 1 - p.x
y_dis = self.map.size - 1 - p.y
# Distance to end point
return x_dis + y_dis + (np.sqrt(2) - 2) * min(x_dis, y_dis)

def TotalCost(self, p):
return self.BaseCost(p) + self.HeuristicCost(p)

def IsValidPoint(self, x, y):
if x < 0 or y < 0:
return False
if x >= self.map.size or y >= self.map.size:
return False
return not self.map.IsObstacle(x, y)

def IsInPointList(self, p, point_list):
for point in point_list:
if point.x == p.x and point.y == p.y:
return True
return False

def IsInOpenList(self, p):
return self.IsInPointList(p, self.open_set)

def IsInCloseList(self, p):
return self.IsInPointList(p, self.close_set)

def IsStartPoint(self, p):
return p.x == 0 and p.y ==0

def IsEndPoint(self, p):
return p.x == self.map.size-1 and p.y == self.map.size-1

这里的函数说明如下:

  • __init__:类的构造函数。
  • BaseCost:节点到起点的移动代价,对应了上文的g(n)g(n)
  • HeuristicCost:节点到终点的启发函数,对应上文的h(n)h(n)。由于我们是基于网格的图形,所以这个函数和上一个函数用的是对角距离。
  • TotalCost:代价总和,即对应上面提到的f(n)f(n)
  • IsValidPoint:判断点是否有效,不在地图内部或者障碍物所在点都是无效的。
  • IsInPointList:判断点是否在某个集合中。
  • IsInOpenList:判断点是否在open_set中。
  • IsInCloseList:判断点是否在close_set中。
  • IsStartPoint:判断点是否是起点。
  • IsEndPoint:判断点是否是终点。

有了上面这些辅助函数,就可以开始实现算法主逻辑了,相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# a_star.py
def RunAndSaveImage(self, ax, plt):
start_time = time.time()

start_point = point.Point(0, 0)
start_point.cost = 0
self.open_set.append(start_point)

while True:
index = self.SelectPointInOpenList()
if index < 0:
print('No path found, algorithm failed!!!')
return
p = self.open_set[index]
rec = Rectangle((p.x, p.y), 1, 1, color='c')
ax.add_patch(rec)
self.SaveImage(plt)

if self.IsEndPoint(p):
return self.BuildPath(p, ax, plt, start_time)

del self.open_set[index]
self.close_set.append(p)

# Process all neighbors
x = p.x
y = p.y
self.ProcessPoint(x-1, y+1, p)
self.ProcessPoint(x-1, y, p)
self.ProcessPoint(x-1, y-1, p)
self.ProcessPoint(x, y-1, p)
self.ProcessPoint(x+1, y-1, p)
self.ProcessPoint(x+1, y, p)
self.ProcessPoint(x+1, y+1, p)
self.ProcessPoint(x, y+1, p)

这段代码应该不需要太多解释了,它就是根据前面的算法逻辑进行实现。为了将结果展示出来,我们在算法进行的每一步,都会借助于matplotlib库将状态保存成图片。

上面这个函数调用了其他几个函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# a_star.py
def SaveImage(self, plt):
millis = int(round(time.time() * 1000))
filename = './' + str(millis) + '.png'
plt.savefig(filename)

def ProcessPoint(self, x, y, parent):
if not self.IsValidPoint(x, y):
return # Do nothing for invalid point
p = point.Point(x, y)
if self.IsInCloseList(p):
return # Do nothing for visited point
print('Process Point [', p.x, ',', p.y, ']', ', cost: ', p.cost)
if not self.IsInOpenList(p):
p.parent = parent
p.cost = self.TotalCost(p)
self.open_set.append(p)

def SelectPointInOpenList(self):
index = 0
selected_index = -1
min_cost = sys.maxsize
for p in self.open_set:
cost = self.TotalCost(p)
if cost < min_cost:
min_cost = cost
selected_index = index
index += 1
return selected_index

def BuildPath(self, p, ax, plt, start_time):
path = []
while True:
path.insert(0, p) # Insert first
if self.IsStartPoint(p):
break
else:
p = p.parent
for p in path:
rec = Rectangle((p.x, p.y), 1, 1, color='g')
ax.add_patch(rec)
plt.draw()
self.SaveImage(plt)
end_time = time.time()
print('===== Algorithm finish in', int(end_time-start_time), ' seconds')

这三个函数应该是比较容易理解的:

  • SaveImage:将当前状态保存到图片中,图片以当前时间命名。
  • ProcessPoint:针对每一个节点进行处理:如果是没有处理过的节点,则计算优先级设置父节点,并且添加到open_set中。
  • SelectPointInOpenList:从open_set中找到优先级最高的节点,返回其索引。
  • BuildPath:从终点往回沿着parent构造结果路径。然后从起点开始绘制结果,结果使用绿色方块,每次绘制一步便保存一个图片。

测试入口

最后是程序的入口逻辑,使用上面写的类来查找路径:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# main.py

import numpy as np
import matplotlib.pyplot as plt

from matplotlib.patches import Rectangle

import random_map
import a_star

plt.figure(figsize=(5, 5))

map = random_map.RandomMap() ①

ax = plt.gca()
ax.set_xlim([0, map.size]) ②
ax.set_ylim([0, map.size])

for i in range(map.size): ③
for j in range(map.size):
if map.IsObstacle(i,j):
rec = Rectangle((i, j), width=1, height=1, color='gray')
ax.add_patch(rec)
else:
rec = Rectangle((i, j), width=1, height=1, edgecolor='gray', facecolor='w')
ax.add_patch(rec)

rec = Rectangle((0, 0), width = 1, height = 1, facecolor='b')
ax.add_patch(rec) ④

rec = Rectangle((map.size-1, map.size-1), width = 1, height = 1, facecolor='r')
ax.add_patch(rec) ⑤

plt.axis('equal') ⑥
plt.axis('off')
plt.tight_layout()
#plt.show()

a_star = a_star.AStar(map)
a_star.RunAndSaveImage(ax, plt) ⑦

这段代码说明如下:

  1. 创建一个随机地图;
  2. 设置图像的内容与地图大小一致;
  3. 绘制地图:对于障碍物绘制一个灰色的方块,其他区域绘制一个白色的的方块;
  4. 绘制起点为蓝色方块;
  5. 绘制终点为红色方块;
  6. 设置图像的坐标轴比例相等并且隐藏坐标轴;
  7. 调用算法来查找路径;

由于我们的地图是随机的,所以每次运行的结果可能会不一样,下面是我的电脑上某次运行的结果:

算法变种

A*算法有不少的变种,这里我们介绍最主要的几个。

更多的内容请以访问维基百科:A* Variants

ARA*

ARA* 全称是Anytime Repairing A*,也称为Anytime A*。

与其他Anytime算法一样,它具有灵活的时间成本,即使在它结束之前被中断,也可以返回路径查找或图形遍历问题的有效解决方案。方法是在逐步优化之前生成快速,非最优的结果。

在现实世界的规划问题中,问题的解决时间往往是有限的。与时间相关的规划者对这种情况都会比较熟悉:他们能够快速找到可行的解决方案,然后不断努力改进,直到时间用完为止。

启发式搜索ARA*算法,它根据可用的搜索时间调整其性能边界。它首先使用松散边界快速找到次优解,然后在时间允许的情况下逐渐收紧边界。如果有足够的时间,它会找到可证明的最佳解决方方案。在改进其约束的同时,ARA*重复使用以前的搜索工作,因此,比其他随时搜索方法更有效。

与A*算法不同,Anytime A*算法最重要的功能是,它们可以被停止,然后可以随时重启。该方法使用控制管理器类来处理时间限制以及停止和重新启动A*算法以找到初始的,可能是次优的解决方案,然后继续搜索改进的解决方案,直到达到可证明的最佳解决方案。

关于ARA*的更多内容可以阅读这篇论文:

D*

D*是Dynamic A*的简写,其算法和A*类似,不同的是,其代价的计算在算法运行过程中可能会发生变化。

D*包含了下面三种增量搜索算法:

  • 原始的D*由Anthony Stentz发表。
  • Focussed D*由Anthony Stentz发表,是一个增量启发式搜索算法,结合了A*和原始D*的思想。
  • D* Lite是由Sven Koenig和Maxim Likhachev基于LPA*构建的算法。

所有三种搜索算法都解决了相同的基于假设的路径规划问题,包括使用自由空间假设进行规划。在这些环境中,机器人必须导航到未知地形中的给定目标坐标。它假设地形的未知部分(例如:它不包含障碍物),并在这些假设下找到从当前坐标到目标坐标的最短路径。

然后机器人沿着路径行进。当它观察到新的地图信息(例如以前未知的障碍物)时,它会将信息添加到其地图中,并在必要时将新的最短路径从其当前坐标重新添加到给定的目标坐标。它会重复该过程,直到达到目标坐标或确定无法达到目标坐标。在穿越未知地形时,可能经常发现新的障碍,因此重新计划需要很快。增量(启发式)搜索算法通过使用先前问题的经验来加速搜索当前问题,从而加速搜索类似搜索问题的序列。假设目标坐标没有改变,则所有三种搜索算法都比重复的A*搜索更有效。

D*及其变体已广泛用于移动机器人和自动车辆导航。当前系统通常基于D* Lite而不是原始D*或Focussed D*。

关于D*的更多内容可以阅读这两篇文章:

Field D*

Field D*扩展了D*和D* Lite,是一种基于插值( interpolation-based )的规划算法,它使用线性插值来有效地生成低成本路径,从而消除不必要的转向。

在给定线性插值假设的情况下,路径是最优的,并且在实践中非常有效。该算法目前被各种现场机器人系统使用。

关于Field D*的详细内容可以看下面这篇论文:

Block A*

Block A*扩展自A*,但它操作是一块(block)单元而不是单个单元。

其open_set中的每个条目都是已到达但尚未扩展的块,或者需要重新扩展的块。

open_set中块的优先级称为其堆值(heap value)。与A*类似,Block A*中的基本循环是删除具有最低堆值的条目并将其展开。在扩展期间使用LDDB来计算正在扩展的块中的边界单元的g值。

LDDB是一种新型数据库,它包含了本地邻域边界点之间的距离。

关于Block A*的更多内容可以看下面这篇论文:

参考资料与推荐读物

留言與分享

函数式编程最近几年越炒越热,有函数式编程的语言鄙视没有函数式编程的语言,纯函数式编程的语言鄙视不纯的函数式编程的语言。

那么,到底什么是函数式编程,函数式编程的核心思想又是什么?

函数式编程的第一个特点就是可以把函数作为参数传递给另一个函数,也就是所谓的高阶函数。例如,对数组进行排序,可以传入一个排序函数作为参数:

1
2
String[] array = { "orange", "Pear", "Apple" };
Arrays.sort(array, String::compareToIgnoreCase);

函数式编程的第二个特点就是可以返回一个函数,这样就可以实现闭包或者惰性计算:

以上两个特点还仅仅是简化了代码。从代码的可维护性上讲,函数式编程最大的好处是引用透明,即函数运行的结果只依赖于输入的参数,而不依赖于外部状态,因此,我们常常说函数式编程没有副作用。

没有副作用有个巨大的好处,就是函数内部无状态,即输入确定,输出就是确定的,容易测试和维护。

很多初学者容易纠结“纯”函数式语言,认为只有Haskell这种消除了变量和副作用的语言才是正宗的函数式编程。还有人甚至认为纯函数不能有任何IO操作,包括打行日志都不行。

其实这种纠结是没有意义的,因为计算机底层就是一个完全可变的内存和不可预测输入的系统,追求完美的无副作用是不现实的,我们只需要理解函数式编程的思想,把业务逻辑做到“无副作用”,至于有变量、打日志、读缓存这些无关紧要的“副作用”,根本不用担心,不需要解决,也几乎没法解决。

我们来举个栗子。

比如一个财务软件,需要一个函数专门计算个人所得税,输入是一个IncomeRecord,输出是个税金额:

1
2
3
double calculateIncomeTax(IncomeRecord record) {
...
}

又假设IncomeRecord长这样:

1
2
3
4
5
class IncomeRecord {
String id; // 身份证号
String name; // 姓名
double salary; // 工资
}

先不考虑五险一金这些乱七八糟的东西,我们只关注如何计算个税。为了简化,我们假设直接扣除一个免征额后按20%计算:

1
2
3
4
5
double calculateIncomeTax(IncomeRecord record) {
double threshold = 3500;
double tax = record.salary <= threshold ? 0 : (record.salary - threshold) * 0.2;
return tax;
}

上面这个程序在2018年9月1号前是没问题的,问题是2018年9月1号后起征点调整到了5000,那2018年8月和2018年9月,计算结果应该不一样。怎么改?

普通开发者的改法:那还不简单?直接获取当前日期,返回正确的起征点就行:

1
2
3
4
5
double calculateIncomeTax(IncomeRecord record) {
double threshold = today() < date(2018, 9, 1) ? 3500 : 5000;
double tax = record.salary <= threshold ? 0 : (record.salary - threshold) * 0.2;
return tax;
}

程序是没错,问题是:

同样的输入,8月31号跑,和9月1号跑,结果不一样,难道会计要在9月1号做8月份的工资条,必须把电脑的时间先调到8月份?

用函数式编程的观点思考一下,就发现问题所在:

today()这个函数,返回结果与时间有关,这就造成了calculateIncomeTax()不再是一个纯函数,它与当前时间相关了。

那怎么把calculateIncomeTax()恢复成一个纯函数,同时要支持起征点调整?

方法是把时间相关的变量作为参数传进来,例如,给IncomeRecord增加几个字段:

1
2
3
4
5
6
7
class IncomeRecord {
String id; // 身份证号
String name; // 姓名
double salary; // 工资
int year; // 年
int month; // 月
}

这样我们就可以消除today()的调用:

1
2
3
4
5
double calculateIncomeTax(IncomeRecord record) {
double threshold = date(record.year, record.month) < date(2018, 9) ? 3500 : 5000;
double tax = record.salary <= threshold ? 0 : (record.salary - threshold) * 0.2;
return tax;
}

calculateIncomeTax()又变成了一个纯函数,会计就不用改电脑时间了。

是不是觉得这个例子太简单了?其实简单的函数如果都能写成有状态的,那么复杂的业务逻辑必然写成一锅粥。

举个复杂的栗子:

对于一个股票交易系统,如果我们把输入定义为:开盘前所有股民的现金和持股,以及交易时段的所有订单,那么,输出就是收盘后所有股民的现金和持股:

1
2
3
4
5
6
7
8
StockStatus process(StockStatus old, List<Order> orders) {
...
for (Order order : orders) {
...
sendExchangeResult(...); // 给每一笔成交发送信息
}
...
}

很显然这是一个纯函数,虽然在处理过程中,这个函数会给股民朋友发送各种心跳消息。

如果把交易系统的模型设计成这样一个纯函数,那么理论上我们只需要从股市开市的那一天开始,把所有订单全部处理一遍,就可以正确得到今天收盘后的状态。

或者说,只要取任意一天开盘前的系统状态的备份(就是整个数据库的备份),把当天的订单重新处理一遍,就得到了当天收盘的状态。这个过程可以做任意次,结果不变,因此,非常适合验证代码的修改是否影响了业务流程。

那么问题来了,交易系统中有无数和时间相关的状态,怎么处理成纯函数?这个模型的处理,可比计算个税复杂多了。

这就是函数式编程的精髓:业务系统模型无状态。模型的好坏,直接影响到代码的正确性、可靠性、稳定性,以及是否需要996。

留言與分享

作者的圖片

Kein Chan

這是獨立全棧工程師Kein Chan的技術博客
分享一些技術教程,命令備忘(cheat-sheet)等


全棧工程師
資深技術顧問
數據科學家
Hit廣島觀光大使


Tokyo/Macau