聊聊服务端热更新

随着Devops以及微服务的普及,应用上线周期变短,对于以前那种需要停机升级的忍受度也在不断的降低,话不多说,进入主题。

我们需要解决的

tXcilt.png
对于我们来说,我们所需要解决的是:当我们运行着一个服务进程的时候,我们期望这个进程能够突然之间变成新的(新的代码逻辑),并且外部无感知。至少包含几点:

  • 服务状态不丢失:比如监控状态,正在处理的数据状态
  • 服务不中断:用户不能够感知到服务被中断

抱着这样的目的,我们来看看我们是怎么解决的。

方案一:逻辑热更新

1
2
3
4
5
6
7
8
9
10
11
+-------------------------------------------------+
| |
| 动 态 脚 本 运 行 层 |
| |
+-------------------------------------------------+
+-------------------------------------------------+
| |
| C++/Java/Go 数 据 接 收 层 |
| |
+-------------------------------------------------+

这种方案是比较常见的一种解决方案,由于嵌入到进程里面的脚本语言引擎,本质上是将脚本语言代码翻译成内存中的Opcode来执行,因此这类型服务器实现“热更新”方案很简单:将新的脚本同步到服务器上,然后给服务器发出一个信号,重新读取脚本代码到内存中即可。

tXRJ78.png

注意点

  1. 脚本应该是在测试环境验证过的,如果上传了错误的脚本很容易带来灾难性的后果,笔者的公司对此需要走上线审计流程
  2. 对于长时间运行的请求需要保留旧的脚本,直到处理完成。
  3. 对于运行层本身的升级就需要停机或者是自举的方式了。

例子

其实这种热更新方案是有很多的类似或者是说形变的例子,我们就探讨一两个。

Log4j2 日志系统

对于日志系统来说,我们经常需要动态的更新日志系统的 Level,比如将 Info -> Warn 这样的操作,如果大家记得话,我们在服务器上将 Logback.xml 的文件进行修改,大概在一段时间之后,我们的日志级别就会变成我们想要的。这里就是一个标准的动态能力的形变。

org.apache.logging.log4j.core.LoggerContext

1
2
3
4
5
6
7
8
9
10
public void updateLoggers() {
updateLoggers(this.configuration);
}

public void updateLoggers(final Configuration config) {
final Configuration old = this.configuration;
for (final Logger logger : loggerRegistry.getLoggers()) {
logger.updateConfiguration(config);
}
}

对于所有的 Logger 对象来说,都有一个相对应的 Configuration 对象,这就是对 Logger 对象的配置,此配置就是我们在 Logback.xml 中的配置,对于 Logback来说,我们需要监听我们的文件是否有所变动即可,代码在 org.apache.logging.log4j.core.util.WatchManager 不做展开了。

Shardingsphere 动态配置

WIP

方案二:来个网关

软件工程中不能解决的问题就加一个中间层来解决(:doge),来吧我们看看一个中间层的解决方案

tXWfIS.png

基于网关控制新的流量,因为我们启动了2个版本的服务实例,此时我们按照时间线切分,在特定时间之后的请求都转发到新的服务器上去。这样对于用户来说是无感知的,不过我们增加一个需要共享内存的操作,对于简单的业务系统来说,我们可以将所有的数据持久化到数据中作为状态的储存,但是还是有一些,比如 prometheus 这一类型的系统,本身就是很多状态数据的,此时我们势必要加入一个共享内容的机制。

例子

这个实现在很多公司来说叫 灰度发布

isito 灰度发布

现在做灰度发布的方案很多种,我们举一个现在流行的 isito 的解决之道。
tXIoDA.png

比如我们有一个 addedvalues 的服务
txmtIJ.png,在线上已经有了 50%/50% 的对于 V1和V2版本来说,这个时候我们需要上线一个新版,将所有的流量转发到新的服务上。
txmHoQ.png

这个时候,我们所需要做的很简单,直接修改CRD即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: addedvalues
spec:
hosts:
- addedvalues
http:
- route:
- destination:
host: addedvalues
Sub set: v2
weight: 50
- destination:
host: addedvalues
subset: v1
weight: 50
1
2
3
4
5
6
7
8
9
10
11
12
13
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: addedvalues
spec:
hosts:
- addedvalues
http:
- route:
- destination:
host: addedvalues
subset: v3
weight: 100

一切是如此的简单,CNCF的魅力真是糟糕的藏不住啊。

Spring Cloud 优雅停机

txK3sP.png
对于 Spring Cloud 体系来说,因为我们增加服务发现组件,此时我们并不一定需要网关才能做到这样的事情,我们可以采用一种变形实现方式,其实本质上都是类似的采用所谓的中间层,让外部无感知。如果有兴趣可以参考 Spring的优雅停机

方案三: Socket 所有权转让

我们之前都是通过用户侧的方案来实现,其实还有一种实现方式,笔者也是查阅 Envoy 的实现才发现有这么个机制。

让我们回忆一下远古的 Socket 编程。
txlHoT.png

对于 Socket 编程来说,按照 Linux的 一切皆文件的朴质的哲学思想,对用户体系暴露的是一个 FD,如下图
txlvl9.png

我们在内核用户之间是一个管理关系的,这层就是为了做到我们可以在多个进程之间共享我们打开的FD的句柄,比如我们两个程序同时打开一个文件,其实对于内核来说我们仅仅打开一个文件,只是维护了不同的 sek
tx1iFO.png

因此我们可以发现对于用户态看到的 FD 其实是一个内核 FD 的一个影子代号,那我们是不是可以用一种方式分享我们 ServerSocket 给不同的用户。以达到这样的效果:

tx1a7V.png

例子

转让 ServerSocket: 基于Enovy的简化

1
2
3
4
5
6
7
8
9
10
import subprocess
import os
import time

if __name__ == '__main__':
sp = subprocess.Popen(["python3", "children.py"])
time.sleep(3)
sp = subprocess.Popen(["python3", "parent.py"])

time.sleep(99999)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import socket, select
import array
import sys
import multiprocessing
import os
import time
import pwd
import grp
import logging
import threading

logger = logging.getLogger()
logging.basicConfig(format='%(asctime)-15s pid=%(process)d %(side)s: %(message)s', level=logging.INFO)

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

running = True
shutdonwing = False


# Function from https://docs.python.org/3/library/socket.html#socket.socket.sendmsg
def send_fds(sock, msg, fds):
return sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", fds))])


def new_send_fd(socket_filename, send_sock):
sock = socket.socket(family=socket.AF_UNIX)

logger.info("Connecting to socket file '%s'" % socket_filename, extra={"side": "SEND ==>"})
e = None
for _ in range(10):
try:
sock.connect(socket_filename)
break
except OSError as e:
logger.error("Socket file '%s' not available yet, try %d/10 (%s)" % (socket_filename, _, e),
extra={"side": "SEND ==>"})
time.sleep(0.5)
pass
else: # nobreak
raise e

logger.info("Connected", extra={"side": "SEND ==>"})

logger.info("Sender delaying 10s to demonstrate a blocking receiver...", extra={"side": "SEND ==>"})
time.sleep(10)
file_descriptor_int = send_sock.fileno()
logger.info("Sending file descriptors %d" % file_descriptor_int, extra={"side": "SEND ==>"})
send_fds(sock, b"some payload", [file_descriptor_int])

global running
running = False
global shutdonwing
shutdonwing = True

def shutdonw():
time.sleep(5)
global shutdonwing
shutdonwing = False

if __name__ == '__main__':
try:
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
serversocket.setblocking(True)

threading.Thread(target=new_send_fd, args=("/tmp/uds_socket", serversocket)).start()

epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)
connections = {}
requests = {}
responses = {}
while running:
events = epoll.poll(1)
for fileno, event in events:
if fileno == serversocket.fileno():
connection, address = serversocket.accept()
connection.setblocking(0)
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = response
elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)
if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
epoll.modify(fileno, select.EPOLLOUT)
#print('-' * 40 + '\n' + requests[fileno].decode()[:-2])
elif event & select.EPOLLOUT:
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
threading.Thread(target=shutdonw).start()
while shutdonwing:
events = epoll.poll(1)
for fileno, event in events:
if fileno == serversocket.fileno():
pass
elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)
if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
epoll.modify(fileno, select.EPOLLOUT)
# print('-' * 40 + '\n' + requests[fileno].decode()[:-2])
elif event & select.EPOLLOUT:
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
finally:
epoll.unregister(serversocket.fileno())
epoll.close()
# serversocket.close()
print("parent will stopping........")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import socket, select
import array
import sys
import multiprocessing
import os
import time
import pwd
import grp

import logging

logger = logging.getLogger()
logging.basicConfig(format='%(asctime)-15s pid=%(process)d %(side)s: %(message)s', level=logging.INFO)

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'


# Function from https://docs.python.org/3/library/socket.html#socket.socket.recvmsg
def recv_fds(sock, msglen, maxfds):
fds = array.array("i") # Array of ints
msg, ancdata, flags, addr = sock.recvmsg(msglen, socket.CMSG_LEN(maxfds * fds.itemsize))
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS):
# Append data, ignoring any truncated integers at the end.
fds.fromstring(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
return msg, list(fds)


def new_recv_fd(socket_filename):
try:
os.remove("/tmp/uds_socket")
except OSError:
pass

logger.info('Starting privledges: %d/%d = %s/%s' % \
(os.getuid(),
os.getgid(),
pwd.getpwuid(os.getuid())[0],
grp.getgrgid(os.getgid())[0]), extra={"side": "<== RECV"})

if os.getuid() == 0:
running_uid = pwd.getpwnam("nobody")[2]
running_gid = grp.getgrnam("nogroup")[2]
os.setgid(running_gid)
os.setuid(running_uid)

logger.info('Dropped privledges: currently %d/%d = %s/%s' % \
(os.getuid(),
os.getgid(),
pwd.getpwuid(os.getuid())[0],
grp.getgrgid(os.getgid())[0]), extra={"side": "<== RECV"})

logger.info("Receiver delaying creation of socket to demonstrate sender retries...", extra={"side": "<== RECV"})
logger.info("Binding to (and creating) AF_UNIX socket socket file '%s'" % socket_filename,
extra={"side": "<== RECV"})
sock = socket.socket(family=socket.AF_UNIX)
sock.bind(socket_filename)
sock.listen()
logger.info("Socket listening %s" % sock, extra={"side": "<== RECV"})

# Waste a file descriptor, so the fd numbers on source and receive sides don't match (they'll be both 6 by default)
leak_a_file_descriptor = open("/etc/hosts")

if not hasattr(sock, "recvmsg"):
raise RuntimeError(
"We don't have a `Socket.recvmsg` in this implementation of python (eg, system python 2.7 on OSX")

# Accept exactly 1 connection
client, info = sock.accept()
logger.info("Connected, client=%s" % client, extra={"side": "<== RECV"})

logger.info("Blocking until message is received", extra={"side": "<== RECV"})
msg, fds = recv_fds(client, 100, 4)
logger.info("Received message msg=%s fds=%s" % (msg, fds), extra={"side": "<== RECV"})
return fds[0]


if __name__ == '__main__':

fdno = new_recv_fd("/tmp/uds_socket")
logger.info("got from parent fd %d" % (fdno), extra={"side": "<== RECV"})
serversocket = socket.socket(fileno=fdno)
# serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# serversocket.bind(('0.0.0.0', 8080))
# serversocket.listen(1)
# serversocket.setblocking(True)

epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)
try:
connections = {}
requests = {}
responses = {}
while True:
events = epoll.poll(1)
for fileno, event in events:
if fileno == serversocket.fileno():
connection, address = serversocket.accept()
connection.setblocking(0)
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = response
elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)
if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
epoll.modify(fileno, select.EPOLLOUT)
#print('-' * 40 + '\n' + requests[fileno].decode()[:-2])
elif event & select.EPOLLOUT:
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
finally:
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
✗ python3 main.py
2020-06-13 23:54:28,647 pid=114797 <== RECV: Starting privledges: 1000/1000 = yanick/yanick
2020-06-13 23:54:28,647 pid=114797 <== RECV: Receiver delaying creation of socket to demonstrate sender retries...
2020-06-13 23:54:28,647 pid=114797 <== RECV: Binding to (and creating) AF_UNIX socket socket file '/tmp/uds_socket'
2020-06-13 23:54:28,647 pid=114797 <== RECV: Socket listening <socket.socket fd=3, family=AddressFamily.AF_UNIX, type=SocketKind.SOCK_STREAM, proto=0, laddr=/tmp/uds_socket>
2020-06-13 23:54:31,657 pid=114804 SEND ==>: Connecting to socket file '/tmp/uds_socket'
2020-06-13 23:54:31,657 pid=114804 SEND ==>: Connected
2020-06-13 23:54:31,657 pid=114804 SEND ==>: Sender delaying 10s to demonstrate a blocking receiver...
2020-06-13 23:54:31,657 pid=114797 <== RECV: Connected, client=<socket.socket fd=5, family=AddressFamily.AF_UNIX, type=SocketKind.SOCK_STREAM, proto=0, laddr=/tmp/uds_socket>
2020-06-13 23:54:31,657 pid=114797 <== RECV: Blocking until message is received
2020-06-13 23:54:41,667 pid=114804 SEND ==>: Sending file descriptors 3
children.py:77: DeprecationWarning: fromstring() is deprecated. Use frombytes() instead.
msg, fds = recv_fds(client, 100, 4)
2020-06-13 23:54:41,668 pid=114797 <== RECV: Received message msg=b'some payload' fds=[6]
2020-06-13 23:54:41,669 pid=114797 <== RECV: got from parent fd 6
parent will stopping........

我们查看两次监听的状态也可以发现Socket本身变化了

1
2
3
4
5
6
7
netstat -tunlp
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:8080 0.0.0.0:* LISTEN 114804/python3

netstat -tunlp
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:8080 0.0.0.0:* LISTEN 114797/python3

源码地址: 需要Python3+

注意点

  • 我们并没没有完整的处理好长连接的 Socket,具体可以参考 [方案四](#方案三的升级: 方案四: 方案三的升级)
  • 值得注意的我们需要打开 EpollLT 模式,因为对于父亲节点有一些是不处理的,因为涉及到进程抢占,需要再次通知。
  • 另外 Linux 转让 FD 的方式据笔者查阅也仅有 Unix Socket 模式,这一部分功能是平台绑定的。

转让 ClientSocket

因为在方案三中我们良好的处理长连接的 Socket,因为我们只是简单的处理了 ServerSocket,但是由此可得:如果可以把正在处理的 ClienSocket 和 它所在的状态一同转换到新的进程上,我们也可以完成这样的事情不是么?

这一次我们简化下网络请求,使用简单的传统 Blocking IO, 当我们第 N 次访问的时候返回 N 的结果。

1
2
3
4
5
telnet 127.0.0.1 8080
1
0 got message: b'1\r\n'
2
1 got message: b'2\r\n'

同上略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import socket
import array
import os
import time
import logging
import threading

logger = logging.getLogger()
logging.basicConfig(format='%(asctime)-15s pid=%(process)d %(side)s: %(message)s', level=logging.INFO)

running = True


# Function from https://docs.python.org/3/library/socket.html#socket.socket.sendmsg
def send_fds(sock, msg, fds):
return sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", fds))])


def new_send_fd(socket_filename, send_sock, i):
sock = socket.socket(family=socket.AF_UNIX)
global running
running = False
logger.info("Connecting to socket file '%s'" % socket_filename, extra={"side": "SEND ==>"})
e = None
for _ in range(10):
try:
sock.connect(socket_filename)
break
except OSError as e:
logger.error("Socket file '%s' not available yet, try %d/10 (%s)" % (socket_filename, _, e),
extra={"side": "SEND ==>"})
time.sleep(0.5)
pass
else: # nobreak
raise e

logger.info("Connected", extra={"side": "SEND ==>"})
logger.info("Sender delaying 10s to demonstrate a blocking receiver...", extra={"side": "SEND ==>"})
time.sleep(10)
file_descriptor_int = send_sock.fileno()
logger.info("Sending file descriptors %d, state: %d" % (file_descriptor_int, i), extra={"side": "SEND ==>"})
send_fds(sock, b"some payload", [file_descriptor_int, i])


def new_connection(conn, status):
i = status
threading.Thread(target=new_send_fd, args=("/tmp/uds_socket", conn, i)).start()
while running:
data = conn.recv(1024)
logger.info("Got Message At %d" % os.getpid(), extra={"side": "SEND ==>"})
if not data:
break
conn.sendall(bytes(("%d got message: %s\n" % (i, data)), 'utf-8'))
i += 1


if __name__ == '__main__':
try:
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 设置了 reusea 简化配置
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen()
logger.info("server listening at 8080", extra={"side": "SEND ==>"})
while True:
conn, addr = serversocket.accept()
threading.Thread(target=new_connection, args=(conn, 0)).start()
finally:
serversocket.close()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import socket
import array
import threading
import os
import pwd
import grp, time

import logging

logger = logging.getLogger()
logging.basicConfig(format='%(asctime)-15s pid=%(process)d %(side)s: %(message)s', level=logging.INFO)


# Function from https://docs.python.org/3/library/socket.html#socket.socket.recvmsg
def recv_fds(sock, msglen, maxfds):
fds = array.array("i") # Array of ints
msg, ancdata, flags, addr = sock.recvmsg(msglen, socket.CMSG_LEN(maxfds * fds.itemsize))
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS):
# Append data, ignoring any truncated integers at the end.
fds.fromstring(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
return msg, list(fds)


def new_recv_fd(socket_filename):
try:
os.remove("/tmp/uds_socket")
except OSError:
pass

logger.info('Starting privledges: %d/%d = %s/%s' % \
(os.getuid(),
os.getgid(),
pwd.getpwuid(os.getuid())[0],
grp.getgrgid(os.getgid())[0]), extra={"side": "<== RECV"})

if os.getuid() == 0:
running_uid = pwd.getpwnam("nobody")[2]
running_gid = grp.getgrnam("nogroup")[2]
os.setgid(running_gid)
os.setuid(running_uid)

logger.info('Dropped privledges: currently %d/%d = %s/%s' % \
(os.getuid(),
os.getgid(),
pwd.getpwuid(os.getuid())[0],
grp.getgrgid(os.getgid())[0]), extra={"side": "<== RECV"})

logger.info("Receiver delaying creation of socket to demonstrate sender retries...", extra={"side": "<== RECV"})
logger.info("Binding to (and creating) AF_UNIX socket socket file '%s'" % socket_filename,
extra={"side": "<== RECV"})
sock = socket.socket(family=socket.AF_UNIX)
sock.bind(socket_filename)
sock.listen()
logger.info("Socket listening %s" % sock, extra={"side": "<== RECV"})

# Waste a file descriptor, so the fd numbers on source and receive sides don't match (they'll be both 6 by default)
leak_a_file_descriptor = open("/etc/hosts")

if not hasattr(sock, "recvmsg"):
raise RuntimeError(
"We don't have a `Socket.recvmsg` in this implementation of python (eg, system python 2.7 on OSX")

# Accept exactly 1 connection
client, info = sock.accept()
logger.info("Connected, client=%s" % client, extra={"side": "<== RECV"})

logger.info("Blocking until message is received", extra={"side": "<== RECV"})
msg, fds = recv_fds(client, 100, 4)
logger.info("Received message msg=%s fds=%s" % (msg, fds), extra={"side": "<== RECV"})
return fds[0], fds[1]


def new_connection(conn, status):
i = status
is_running = True
with conn:
while is_running:
data = conn.recv(1024)
logger.info("Got Message At %d" % os.getpid(), extra={"side": "SEND ==>"})
if not data:
break
conn.sendall(bytes(("%d got message: %s\n" % (i, data)), 'utf-8'))
i += 1


if __name__ == '__main__':
fdno = new_recv_fd("/tmp/uds_socket")
logger.info("got from parent fd: %d status: %d" % (fdno[0], fdno[1]), extra={"side": "<== RECV"})
threading.Thread(target=new_connection, args=(socket.socket(fileno=fdno[0]), fdno[1])).start()
logger.info("startup server..", extra={"side": "<== RECV"})
time.sleep(100)

访问可得如下的结果:

1
2
3
4
5
6
$ telnet 127.0.0.1 8080
0 got message: b'\r\n'
1 got message: b'\r\n'
2 got message: b'\r\n'
3 got message: b'\r\n'
4 got message: b'\r\n'

注意点

  • 这里值得注意的是需要处理 clientsocket 的状态:当我们转让 socket 的时候也需要处理好,此时的socket处于的状态。

总结

对于我们来说热升级的集中方案对比

特性 脚本运行层 基于网关 基于Socket
开发效率
性能
架构难度
备注 常见在多变的逻辑层 微服务系统常见 使用的比较少,容易出错

参考