理解kubernetes service

理解service的角度

这篇文章不是关于如何使用kubernetes中的service,而是尝试整理我自己对service的看法,然后加深对service的理解。那么,我是从哪几个角度去看待service呢?

  • service是服务的稳定性保证
  • service是集群中的load balance
  • 通过无selector的service去理解VIP(虚拟ip)
  • service的设计,和不同实现方式的性能

service是服务的稳定性保证

在k8s集群中,无状态的pod副本是可以随时删除、随时创建的,并且重新创建的pod不再保留旧的pod的任何信息,包括ip地址。在这样的情况下,前端应用如何使用后端的这些pod来提供服务就成了问题,因此k8s实现了service这样一个抽象的概念。对于有selector的service,它在被创建的时候会自动创建endpoint资源,这个endpoint中包含了所有的pod的ip和端口,并且在之后的pod的删除、创建中,这个endpoint中会立即更新相关pod的ip和端口信息。同时,service的ip地址是永远固定的,service和endpoint是一一对应的关系。这样,如果前端应用通过固定的service ip来访问pod提供的服务,那么就可以在endpoint中找到一个可用的pod的ip和端口,然后通过一些操作(这个在后面会整理)将数据包转发到指定的pod上即可。

# 你可以通过kubectl查看service和endpoint来加深理解

kubectl -n h2o describe svc h2o

Name:              h2o
Namespace:         h2o
Labels:            app=h2o
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"labels":{"app":"h2o"},"name":"h2o","namespace":"h2o"},"spec":{"clusterIP...
Selector:          app=h2o
Type:              ClusterIP
IP:                None
Port:              web  54321/TCP
TargetPort:        54321/TCP
Endpoints:         10.42.1.33:54321,10.42.2.139:54321
Session Affinity:  None
Events:            <none> kubectl -n h2o get endpoints h2o
NAME   ENDPOINTS                            AGE
h2o    10.42.1.33:54321,10.42.2.139:54321   4h45m

service通过ip地址的固定来保证服务的稳定性。那为啥service就是可以固定不变的呢?这是因为service本身就是一个抽象的概念啊,它不是一个正在运行的进程,只是一条数据,也正因为如此,它的ip地址和端口号也是不存在的,这些都是存储在etcd中的一条数据。那么k8s是如何通过这样一个虚假的ip和端口将请求转发到真实存在的pod中呢?这就是后面要说的内容了。

service是集群中的load balance

在上一节说到,一个service会对应一个endpoint,这个endpoint中会保存所有当前匹配到的pod的ip和端口号。那么现在有一个http请求过来了,发现endpoint中有三个待选的pod,那么我们使用一定的方式比较公平的选择出一个pod,就轻松的达到了负载均衡的效果。

service load balance

那么k8s中,load balance的策略是什么样的呢?因为不同的service实现方式使用的方法不同,这个内容会在后面整理。

通过无selector的service去理解VIP(虚拟ip)

在前面的内容中,service一直和endpoint、pod关联在一起,那么如果我们的service没有selector,就不会创建endpoint了,也不会关联pod。前面也提到了service是一个抽象的概念,其拥有的ip和port都是假的。其实这个叫做VIP(virtual ip)。那么,如何通过无selector的service来理解VIP呢?

在k8s中创建无selector service的时候,不会自动创建关联的endpoint,更不会去匹配pod了。但是这样的service仍然是拥有ip和port的。我们可以尝试一下:

svc-without-selector.yaml

apiVersion: v1
kind: Service
metadata:
  name: my-service
spec:
  ports:
    - protocol: TCP
      port: 8081
      targetPort: 8081
kubectl apply -f svc-without-selector.yaml

查看一下这个svc的详情:

$ kubectl describe svc my-service

Name:              my-service
Namespace:         default
Labels:            <none>
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"my-service","namespace":"default"},"spec":{"ports":[{"port":8081,...
Selector:          <none>
Type:              ClusterIP
IP:                10.43.12.208
Port:              <unset>  8081/TCP
TargetPort:        8081/TCP
Endpoints:         <none>
Session Affinity:  None
Events:            <none>

除了拥有ip和端口号,就什么都没有了。这就是说service为什么就是一条数据的原因,10.43.12.208也就是一个VIP。

对于无selector的service还有一个用处,就是让集群内部的应用可以稳定的访问到集群外部的服务。因为service是稳定的,那么集群内部都可以访问这个service,然后让这个service将请求转发到集群外。

这里我们可以手动创建一个endpoint,这个endpoint包含了集群外的两个http服务

apiVersion: v1
kind: Endpoints
metadata:
  name: my-service
subsets:
  - addresses:
      - ip: 192.168.50.99
      - ip: 192.168.50.201
    ports:
      - port: 8081

然后我们先检查一下service,发现endpoints已经更新了。

$ kubectl describe svc my-service

Name:              my-service
Namespace:         default
Labels:            <none>
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"my-service","namespace":"default"},"spec":{"ports":[{"port":8081,...
Selector:          <none>
Type:              ClusterIP
IP:                10.43.12.208
Port:              <unset>  8081/TCP
TargetPort:        8081/TCP
Endpoints:         192.168.50.201:8081,192.168.50.99:8081
Session Affinity:  None
Events:            <none>

我们在集群内部访问一下(使用kubectl exec到一个pod上):

$ wget my-service:8081 -q -O out | cat out
server 2
$ wget my-service:8081 -q -O out | cat out
server 1
$ wget my-service:8081 -q -O out | cat out
server 2
$ wget my-service:8081 -q -O out | cat out
server 1

service的设计,和不同实现方式的性能

service的设计是以提高性能为前提不断的演进的,这里是关于Service的设计讨论: DESIGN: Services v2。感兴趣的还可以看看k8s-release-v1.0的时候对service的描述: Service

service的设计中有4个角色: Pod、 Service、Ambassador、Portal

  • Pod: k8s集群中的最小调度单位,包含一个或多个容器
  • Service: 一组pod的集合,由标签选择器来关联
  • Ambassador: 中文翻译是大使,是一段可执行的逻辑,它负责实现客户端访问Service,然后将请求转发到一个对应的Pod上。这个Ambassador可以是一个云服务商的服务,也可以是一个单独的pod(比如haproxy),或者是每个节点都有的共享进程(kube-proxy)。
  • Portal: 固定的ip:port对,客户端只要访问这个Portal,请求自然会被转发到Ambassador上,客户端不需要理解Ambassador的具体实现。

最初的设计中是有三种方案,

方案一: 每个服务一个ip,共享的Ambassador。这个ip就是上面说的Portal ip。将服务以及ip、端口广播给所有的kube-proxy实例。kube-proxy设置好iptables来“窃取”所有到Portal(ip,port)的请求,然后将这个请求转发到自己的某个端口上。这里kube-proxy扮演的是Ambassador角色,它会使用round-robin的方法来把请求均衡的分发到Service后面的Pod上。这个方案里,有以下的优点和缺点:

优点:
– 不会有端口冲突
– Service的ip和port都是固定的,方便做DNS A (forward) 和 PTR (reverse)和 SRV 记录。
– iptables可以放在root namespace,即使pods重启了也不需要更新iptables(这是因为iptables是负责将到service ip:port的流量转发到kube-proxy的一个端口上即可)。
– 不需要在pod上预先声明需要的Service。

缺点:

  • kube-proxy是多租户的(需要为所有的service做流量转发)
  • 从kube-proxy转发的流量的源ip不是真实的源ip,
  • 需要为portal预留虚拟ip空间
  • 需要master跟踪和检查所有的portal ip
  • 当service数量级上千后可扩展性不高

方案二: 每个服务一个ip,私有的Ambassador。对每个pod来说,都有一个私有的的ambassador,这要求pod需要先声明它们想先访问那个服务(否则的话,对于集群中的每次Service的添加和删除,都需要kubelet或其他的root-namespace、true-root的用户代理变动到每个pod的namespace下。[iptables规则需要root用户]),这样才能在pod的命名空间下建立iptables规则。

优点:

  • 不会有端口冲突
  • Service的ip和port都是固定的,方便做DNS A (forward) 和 PTR (reverse)和 SRV 记录。
  • 代理不是多租户的
  • 从kube-proxy转发的流量的源ip是真实的源ip,
  • 容易从方案一迁移
  • 需要pod预先声明服务(结构良好)

缺点:

  • iptables是配置在pod的namespace下,但是pod的命名空间重启了就必须重新运行一次
  • 需要为portal预留虚拟ip空间
  • 需要master跟踪和检查所有的portal ip
  • 需要pod预先声明服务(目前还没实现)

方案三:localhost的portal,私有的ambassador

不同于给service分配ip,而是使用本地的端口号作为portal。

介绍完这三种方案后,就可以引入service最终的演进了: userspace->iptables->ipvs。

这里先放一张iptables的工作流程图,方便理解:

iptables

userspace模式

这里的userspace就是方案一的实现,在k8s 1.0的发布中正式启用。userspace的工作原理图如下:

userspace service overview

这种模式,kube-proxy 会监视 Kubernetes master 对 Service 对象和 Endpoints 对象的添加和移除。 对每个 Service,它会在本地 Node 上打开一个端口(随机选择)。 任何连接到“代理端口”的请求,都会被代理到 Service 的backend Pods 中的某个上面(如 Endpoints 所报告的一样)。 使用哪个 backend Pod,是 kube-proxy 基于 SessionAffinity 来确定的。

最后,它安装 iptables 规则,捕获到达该 Service 的 clusterIP(是虚拟 IP)和 Port 的请求,并重定向到代理端口,代理端口再代理请求到 backend Pod。默认情况下,用户空间模式下的kube-proxy通过round-robin选择后端。

这里有一个问题在于,client访问service的clusterIP时,iptables会把流量转发到kube-proxy的某个端口上,这样的话,每次转发都有一个内核态用户态的转换。

iptables模式

iptables service overview

这种模式,kube-proxy 会监视 Kubernetes 控制节点对 Service 对象和 Endpoints 对象的添加和移除。 对每个 Service,它会安装 iptables 规则,从而捕获到达该 Service 的 clusterIP 和端口的请求,进而将请求重定向到 Service 的一组 backend 中的某个上面。 对于每个 Endpoints 对象,它也会安装 iptables 规则,这个规则会选择一个 backend 组合。

默认的策略是,kube-proxy 在 iptables 模式下随机选择一个 backend。类似于这样

iptables -t nat -A PREROUTING -p tcp -d 15.45.23.67 --dport 80 -j DNAT --to-destination 192.168.1.1-192.168.1.10

使用 iptables 处理流量具有较低的系统开销,因为流量由 Linux netfilter 处理,而无需在用户空间和内核空间之间切换。 这种方法也可能更可靠。

如果 kube-proxy 在 iptable s模式下运行,并且所选的第一个 Pod 没有响应,则连接失败。 这与用户空间模式不同:在这种情况下,kube-proxy 将检测到与第一个 Pod 的连接已失败,并会自动使用其他后端 Pod 重试。

您可以使用 Pod readiness 探测器 验证后端 Pod 可以正常工作,以便 iptables 模式下的 kube-proxy 仅看到测试正常的后端。 这样做意味着您避免将流量通过 kube-proxy 发送到已知已失败的Pod。

ipvs模式

ipvs是在Kubernetes v1.11正式可用的。ipvs也是依赖于iptables的,但是它的性能更高。

ipvs service overview

在ipvs模式下,kube-proxy监视Kubernetes服务和端点,调用netlink接口相应地创建IPVS规则,并定期将IPVS规则与Kubernetes服务和端点同步。该控制循环可确保IPVS状态与所需状态匹配。访问服务时,IPVS 将流量定向到后端Pod之一。

IPVS代理模式基于类似于iptables模式的netfilter挂钩函数,但是使用哈希表作为基础数据结构,并且在内核空间中工作。 这意味着,与iptables模式下的 kube-proxy 相比,IPVS 模式下的 kube-proxy 重定向通信的延迟要短,并且在同步代理规则时具有更好的性能。与其他代理模式相比,IPVS 模式还支持更高的网络流量吞吐量。

IPVS提供了更多选项来平衡后端Pod的流量。 这些是:

  • rr: round-robin
  • lc: least connection (smallest number of open connections)
  • dh: destination hashing
  • sh: source hashing
  • sed: shortest expected delay
  • nq: never queue

注意:
要在IPVS模式下运行kube-proxy,必须在启动kube-proxy之前使IPVS Linux在节点上可用。

当 kube-proxy 以 IPVS 代理模式启动时,它将验证 IPVS 内核模块是否可用。 如果未检测到 IPVS 内核模块,则 kube-proxy 将退回到以 iptables 代理模式运行。

ipvs在同步规则、网络带宽、cpu/内存消耗上都明显优于iptables,关于具体的性能数据可以看这篇文章: 华为云在 K8S 大规模场景下的 Service 性能优化实践。ipvs的详细介绍可以看这篇文章:ipvs 基本介绍。ipvs和iptables的对比:kube-proxy 模式对比:iptables 还是 IPVS?

rook ceph的rgw崩溃问题排查

问题

在开发可视化机器学习平台时,集成的FastRCNN实验一直跑不到结束就会出错。有时候是在下载基础模型以及代码包时出错,有时候在train结束后向predict传递artifacts出错。

过程

首先这个问题出现在局域网内,处于开发环境,因此ceph没有做高可用的部署。其次,ceph是用rook这个项目部署在k8s集群中的。

最后,在使用argo做机器学习的资源调度时,会出现大的数据资源下载和转移出现错误。具体表现为:大量数据下载会出现connectiion refused,日志如下:

2019-10-25 02:35:24 (20.1 MB/s) - Connection closed at byte 528482304. Retrying.
--2019-10-25 02:35:25--  (try: 2)  http://rook-ceph-rgw-my-store.rook-ceph/workflow-storage/tho6wHm0UmZeZYbfWBv5lkOZ576838763
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.
Resolving rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)... 10.43.126.166
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.

大量数据上传时也会中断,导致argo无法调用下一步:

日志如下:

NAME            custom-workflow-43-6rhlb.api-train-faster-1699
TYPE            Pod
PHASE           Error
MESSAGE         failed to save outputs: timed out waiting for the condition
START TIME      2019-10-24T06:15:56Z
END TIME        2019-10-24T06:30:34Z
DURATION        14:38 min

这里可能是网络问题,argo的问题或者是ceph的问题。但是当数据量不大的时候不会出现错误。因此检查ceph是否正常

~ » kubectl -n rook-ceph get pods                                                
NAME                                           READY   STATUS      RESTARTS   AGE
csi-cephfsplugin-964zm                         3/3     Running     27         46d
csi-cephfsplugin-dxnbg                         3/3     Running     12         46d
csi-cephfsplugin-provisioner-b66d48bc8-fglq9   4/4     Running     0          12d
csi-cephfsplugin-provisioner-b66d48bc8-x67pd   4/4     Running     0          12d
csi-rbdplugin-5fs2x                            3/3     Running     27         46d
csi-rbdplugin-bddlt                            3/3     Running     12         46d
csi-rbdplugin-provisioner-95dd85d6-7kc4c       5/5     Running     0          12d
csi-rbdplugin-provisioner-95dd85d6-mpjtj       5/5     Running     0          12d
rook-ceph-agent-fs4xq                          1/1     Running     9          46d
rook-ceph-agent-wx6r4                          1/1     Running     4          46d
rook-ceph-mds-myfs-a-774974c8c4-xt2ls          1/1     Running     0          12d
rook-ceph-mds-myfs-b-748d7d7f7d-wftt5          1/1     Running     0          12d
rook-ceph-mgr-a-5f54d44c98-57qcb               1/1     Running     0          12d
rook-ceph-mon-a-6f9fbfc99d-lmb6c               1/1     Running     0          17d
rook-ceph-operator-6f556bcbff-glvt6            1/1     Running     0          12d
rook-ceph-osd-0-7c489dc87b-wkt7x               1/1     Running     0          17d
rook-ceph-osd-1-86cc67cc45-25h4q               1/1     Running     0          12d
rook-ceph-osd-prepare-worknode1-xnmpw          0/1     Completed   0          12d
rook-ceph-rgw-my-store-a-66b7d8cc9d-vrhkm      1/1     Running     65         12d
rook-ceph-tools-5f5dc75fd5-52jbj               1/1     Running     0          12d
rook-discover-g95ws                            1/1     Running     6          46d
rook-discover-vs5fs                            1/1     Running     13         46d

发现ceph rgw重启了65次,这个肯定是不正常的。查看ceph rgw的日志:

$ kubectl -n rook-ceph logs -p rook-ceph-rgw-my-store-a-66b7d8cc9d-vrhkm

# 截取了一部分日志
debug 2019-10-25 02:35:13.473 7f4880b98700  1 ====== starting new request req=0x55aa678c48e0 =====
debug 2019-10-25 02:35:14.549 7f4880b98700  0 ERROR: client_io->complete_request() returned Broken pipe
debug 2019-10-25 02:35:14.549 7f4880b98700  1 ====== req done req=0x55aa678c48e0 op status=0 http_status=200 latency=1.076s ======
debug 2019-10-25 02:35:19.949 7f48e345d700  1 ====== starting new request req=0x55aa54a488e0 =====
debug 2019-10-25 02:35:19.949 7f48e345d700  1 ====== req done req=0x55aa54a488e0 op status=0 http_status=404 latency=0s ======

在我的理解中broken pipe一般出现在向已关闭的连接中写入数据时,会出现这个问题。但是通过日志可以发现,出现broken pipe的错误之后,rgw仍然是在处理请求的,但是部分请求的latency很高。因此这里的broken pipe是表示着rgw开始出现一些异常情况,但不是pod重启的直接原因。

真正导致rgw被杀死的原因是因为rgw进程收到了sigterm信号,然后进程被杀死。

debug 2019-10-25 02:35:24.525 7f494c52f700 -1 received  signal: Terminated from Kernel ( Could be generated by pthread_kill(), raise(), abort(), alarm() ) UID: 0
debug 2019-10-25 02:35:24.525 7f494c52f700  1 handle_sigterm
debug 2019-10-25 02:35:24.525 7f494c52f700  1 handle_sigterm set alarm for 120
debug 2019-10-25 02:35:24.525 7f4962116780 -1 shutting down
debug 2019-10-25 02:35:24.629 7f488cbb0700  0 iterate_obj() failed with -9

使用kubectl describe查看pod的event:

Events:
  Type     Reason     Age                  From                Message
  ----     ------     ----                 ----                -------
  Normal   Killing    15m (x65 over 12d)   kubelet, worknode1  Container rgw failed liveness probe, will be restarted
  Warning  Unhealthy  15m (x249 over 12d)  kubelet, worknode1  Liveness probe failed: Get http://10.42.2.44:80/: net/http: request canceled (Client.Timeout exceeded while awaiting headers)
  Normal   Pulled     15m (x66 over 12d)   kubelet, worknode1  Container image "ceph/ceph:v14.2.2-20190826" already present on machine
  Normal   Created    15m (x66 over 12d)   kubelet, worknode1  Created container rgw
  Normal   Started    15m (x66 over 12d)   kubelet, worknode1  Started container rgw

这里才是真正的重启原因,kubelet检查pod是否存活,但是请求超时了。意味pods出现的故障,因此杀死了pod并重启。

pod的liveness设置:

Liveness:       http-get http://:80/ delay=10s timeout=1s period=10s #success=1 #failure=3

k8s的liveness机制是检查pod中应用程序存活状态并在出错后自动重启的一种机制。提供了三种方式:

  • 在容器内执行命令,如果执行成功,则表示容器是存活并且健康的。否则就重启容器使得应用程序恢复正常。
  • 使用http请求检查,如果返回的状态码是200则表示正常,否则表示失败。
  • 使用tcp连接检查,如果kubelet可以打开指定端口的socket连接,则表示正常,否则表示失败。

在这个场景下出现Warning Unhealthy 15m (x249 over 12d) kubelet, worknode1 Liveness probe failed: Get http://10.42.2.44:80/: net/http: request canceled (Client.Timeout exceeded while awaiting headers),表示kubelet使用http get检查pod的80端口,但是这个请求却超时了。因此杀死了容器并重启,导致大文件(700MB以上)上传/下载失败。

这里kubelet检查的是http://10.42.2.44:80这个地址,我们回过头看一下argo那边的报错信息,Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.。都是80端口,当然这里千万不能被ip地址误导了,10.42.2.44是pod的ip地址,10.43.126.166是service的ip地址,我们可以验证一下:

~ » kubectl -n rook-ceph get svc                                                 
NAME                              TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
rook-ceph-rgw-my-store            ClusterIP   10.43.126.166   <none>        80/TCP              46d

然后再结合之前debug 2019-10-25 02:35:14.549 7f4880b98700 1 ====== req done req=0x55aa678c48e0 op status=0 http_status=200 latency=1.076s ======这条日志,latency已经超过了1s,而kubelet的liveness超时时间是1s。

现在基本可以得出以下异常流程:

  1. 因为某些原因,导致rgw出现broken pipe的出错,并且部分请求的lantency时间大大提高。
  2. kubelet周期性的对rgw做liveness的检查,并且检查的http就是rgw的80端口,这个端口因为上面的原因导致lantency超过了1s,而liveness检查的timeout只有1s。因此kubelet认为该pod不健康,选择重启。
  3. kubelet向rgw发送了sigterm信号,rgw关闭进程,pod重启。

猜测可能造成这个问题的原因:

  1. ceph所在机器的性能不够,导致响应请求出现问题。
  2. 局域网的网络问题,因为内部的最高带宽只有10MB/s,但是局域网内的设备很多,网络这部分导致了瓶颈。

关于机器性能的问题,我认为是可以排除的,因为机器性能本身很好,并且开发环境几乎没有请求量,接下来就是验证是因为网络问题导致瓶颈,造成部分接口延迟过高被杀死。

为了验证这个猜想,假设这里有三台机器A,B,C,组成了一个k8s集群,ceph是部署在k8s之上的。在A之上,我用dd命令产生一个40G的大文件,然后在B之上使用wget下载。然后在C上面观察ping的延迟是否上升。

A:

$ dd if=/dev/zero of=test bs=1M count=0 seek=40000

$ python -m SimpleHTTPServer 8999

B:

$ wget http://192.168.50.37:8999/test

--2019-10-25 14:18:30--  http://192.168.50.37:8999/test
正在连接 192.168.50.37:8999... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度: 41943040000 (39G) [application/octet-stream]
正在保存至: “test”

test      3%[==>            ]   1.54G  11.1MB/s    剩余 73m 5s

C:

$ ping 192.168.50.37
PING 192.168.50.37 (192.168.50.37) 56(84) bytes of data.
64 bytes from 192.168.50.37: icmp_seq=1 ttl=64 time=0.384 ms
64 bytes from 192.168.50.37: icmp_seq=2 ttl=64 time=0.373 ms
64 bytes from 192.168.50.37: icmp_seq=3 ttl=64 time=0.336 ms
64 bytes from 192.168.50.37: icmp_seq=4 ttl=64 time=4.90 ms
64 bytes from 192.168.50.37: icmp_seq=5 ttl=64 time=1.18 ms
64 bytes from 192.168.50.37: icmp_seq=6 ttl=64 time=7.74 ms
64 bytes from 192.168.50.37: icmp_seq=7 ttl=64 time=3.51 ms
64 bytes from 192.168.50.37: icmp_seq=8 ttl=64 time=6.66 ms
64 bytes from 192.168.50.37: icmp_seq=9 ttl=64 time=6.31 ms

网络延迟增加的还是很明显的。

这时候使用argo开始一个新的机器学习的实验,但是这个实验的数据量较小,在之前的使用中都没有问题。结果确实出现了问题:

2019-10-25 06:21:04 (8.10 MB/s) - Connection closed at byte 136314880. Retrying.
--2019-10-25 06:21:05--  (try: 2)  http://rook-ceph-rgw-my-store.rook-ceph/workflow-storage/maC8Om6Y3QbSJxSTT9x82rp8908272441
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.
Resolving rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)... 10.43.126.166
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.

那么为了有对照实验,将下载关闭,重新做这个机器学习的实验,结果正常。

解决方法

因为缺少了对ceph这块源代码的研究,上面的结论并不一定正确。但是可以大概得出如何解决,可以先尝试将liveness检测的timeout时间增加。

kubectl -n rook-ceph edit deployment rook-ceph-rgw-my-store-a

把liveness的timeout时间调成5s,这样就解决了这个问题。

理解go context

理解context

在我刚接触context包时,我是有一点迷惑的。因为在其他的编程语言中很少有接触到context包类似的用法。比如在js绘制canvas中的context,也只是作为保留上下文操作来用的。在go语言的context包中,同样也可以当成上下文来理解,但是在看待context提供的能力时,要从以下两点来理解:

  • context提供了一种管理多个goroutine的机制。
  • context最终形成了一种树形结构。

在深入之前,让我们回忆一下多线程/进程模型中,主线程/进程是如何管理子线程/进程的。如果子线程/进程又派生了其他的线程/进程呢?这一定是一个头疼的问题。

在go语言中,协程也面临了同样的问题。因此官方在go1.7版本中引入了context包。那么context提供了什么样的能力来管理协程呢?先看一个withCancel的简单的例子

func watch(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            log.Println("退出")
            return

        default:
            log.Println("执行逻辑")
            time.Sleep(2 * time.Second)
        }
    }
}

func withCancel() {
    ctx, cancel := context.WithCancel(context.Background())

    go watch(ctx)
    go watch(ctx)
    go watch(ctx)

    time.Sleep(6 * time.Second)
    fmt.Println("可以了,通知子协程停止")
    cancel()
    //为了检测子协程是否停止,如果没有输出,就表示停止了
    time.Sleep(5 * time.Second)
}

调用withCancel的输出如下:

2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:52 执行逻辑
2019/09/30 00:18:52 执行逻辑
2019/09/30 00:18:52 执行逻辑
可以了,通知子协程停止
2019/09/30 00:18:54 退出
2019/09/30 00:18:54 退出
2019/09/30 00:18:54 退出

可以看到,我们通过context.WithCancel方法生成了一个ctx和一个cancel,然后主动调用cancel就可以通过所有的子协程退出了。在watch方法的实现中,我们是通过select机制来实现的,一旦context的Done()方法有值,就会调用return退出,否则的话就执行default中我们的业务逻辑。

这样我们就可以随时通知所有的子协程退出了。在上面说到,context最终形成了一种树形结构,是因为在子协程中也可以继续使用新的协程,这样就形成了一个树形的调用了。

协程的树形结构

我们在1中使用cancel方法,就可以向下传播,在2~10号协程中全部退出。

简单的了解context包的使用后,可以看一下context.Context这个接口,为了简洁,我删除了源代码中的注释。

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

Context接口总共提供了4个方法。

  • Deadline()用来获取当前context的取消时间,第二个返回值ok等于false的时候,表示没有设置。
  • Done()方法返回了一个chan,当chan中读取到值的时候,表示父context已经发起了取消的请求,那么当前协程开始做相关的清理工作然后退出。
  • Err()返回context的取消原因
  • Value()方法用来通过一个key获取当前Context上与之对应的值。

理解Context的树形结构

go中大量的库都使用了context机制,比如database/sql库,net/http库等等,因为这些库都支持了context,使得我们在程序中很容易通过context来管理所有新建的协程,而不用自己实现复杂的机制来管理。一旦我们需要取消,只需要在root context调用cancel方法即可。

一些基本使用

在上面的例子中,我们使用了withCancel来实例化一个可以手动取消的context。context包中同样提供了一些其他的方法。

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithDeadline可以设置截止时间。会到达指定时间时自动取消。当然也可以调用CancelFunc来手动取消。

WithTimeout可以设置在一段时间后自动取消,和WithDeadline类似。

参考

Go语言实战笔记(二十)
Golang Context深入理解
Go Concurrency Patterns: context

etcd分布式锁的实现方式

一、概述

在etcd的clientv3包中,实现了分布式锁。使用起来和mutex是类似的,为了了解其中的工作机制,这里简要的做一下总结。

二、使用方式

etcd分布式锁的实现在go.etcd.io/etcd/clientv3/concurrency包中,主要提供了以下几个方法:

  • func NewMutex(s *Session, pfx string) *Mutex, 用来新建一个mutex
  • func (m *Mutex) Lock(ctx context.Context) error,它会阻塞直到拿到了锁,并且支持通过context来取消获取锁。
  • func (m *Mutex) Unlock(ctx context.Context) error,解锁

因此在使用etcd提供的分布式锁式非常简单,通常就是实例化一个mutex,然后尝试抢占锁,之后进行业务处理,最后解锁即可。

一个简单的例子如下:

package main

import (
    "context"
    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
    "log"
    "sync"
    "time"
)

var n = 0

// 使用worker模拟锁的抢占
func worker(key string) error {
    endpoints := []string{"127.0.0.1:2379"}

    cfg := clientv3.Config{
        Endpoints:            endpoints,
        DialTimeout:          3 * time.Second,
    }

    cli, err := clientv3.New(cfg)
    if err != nil {
        log.Println("new cli error:", err)
        return err
    }

    sess, err := concurrency.NewSession(cli)
    if err != nil {
        return err
    }

    m := concurrency.NewMutex(sess, "/"+key)

    err = m.Lock(context.TODO())
    if err != nil {
        log.Println("lock error:", err)
        return err
    }

    defer func() {
        err = m.Unlock(context.TODO())
        if err != nil {
            log.Println("unlock error:", err)
        }
    }()

    log.Println("get lock: ", n)
    n++
    time.Sleep(time.Second) // 模拟执行代码


    return nil
}

func main() {
    var wg sync.WaitGroup
    wg.Add(3)
    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()


    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()

    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()

    wg.Wait()
}

三、实现机制

Lock()函数的实现很简单。这里可以贴出来看一下:

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }

    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}

首先通过一个事务来尝试加锁,这个事务主要包含了4个操作: cmpputgetgetOwner。需要注意的是,key是由pfxLease()组成的。

  • cmp: 比较加锁的key的修订版本是否是0。如果是0就代表这个锁不存在。
  • put: 向加锁的key中存储一个空值,这个操作就是一个加锁的操作,但是这把锁是有超时时间的,超时的时间是session的默认时长。超时是为了防止锁没有被正常释放导致死锁。
  • get: get就是通过key来查询
  • getOwner: 注意这里是用m.pfx来查询的,并且带了查询参数WithFirstCreate()。使用pfx来查询是因为其他的session也会用同样的pfx来尝试加锁,并且因为每个LeaseID都不同,所以第一次肯定会put成功。但是只有最早使用这个pfxsession才是持有锁的,所以这个getOwner的含义就是这样的。

接下来才是通过判断来检查是否持有锁

m.myRev = resp.Header.Revision
if !resp.Succeeded {
    m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
    m.hdr = resp.Header
    return nil
}

m.myRev是当前的版本号,resp.Succeededcmp为true时值为true,否则是false。这里的判断表明当同一个session非第一次尝试加锁,当前的版本号应该取这个key的最新的版本号。

下面是取得锁的持有者的key。如果当前没有人持有这把锁,那么默认当前会话获得了锁。或者锁持有者的版本号和当前的版本号一致, 那么当前的会话就是锁的持有者。

// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
    m.Unlock(client.Ctx())
} else {
    m.hdr = hdr
}

上面这段代码就很好理解了,因为走到这里说明没有获取到锁,那么这里等待锁的删除。

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
    for {
        resp, err := client.Get(ctx, pfx, getOpts...)
        if err != nil {
            return nil, err
        }
        if len(resp.Kvs) == 0 {
            return resp.Header, nil
        }
        lastKey := string(resp.Kvs[0].Key)
        if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
            return nil, err
        }
    }
}

waitDeletes方法的实现也很简单,但是需要注意的是,这里的getOpts只会获取比当前会话版本号更低的key,然后去监控最新的key的删除。等这个key删除了,自己也就拿到锁了。

这种分布式锁的实现和我一开始的预想是不同的。它不存在锁的竞争,不存在重复的尝试加锁的操作。而是通过使用统一的前缀pfx来put,然后根据各自的版本号来排队获取锁。效率非常的高。

etcd 分布式锁

如图所示,共有4个session来加锁,那么根据revision来排队,获取锁的顺序为session2 -> session3 -> session1 -> session4。

当然,这里为什么可以通过revision来判定获取锁的顺序,就需要更深入的了解etcd的内部机制以及raft协议了。

分布式文件上传方案

-## 一、背景
考虑可扩展性,后台的服务肯定是要能够支持任意的扩展的,这样才能在业务量增长时通过增加机器的方式来应对。这对后台服务提出了一个要求,必须处理好分布式环境和单机环境的不同带来的问题。比如:在文件的分片上传这一场景下,应该负载均衡的问题,一个文件的多个分片请求会分布到不同的服务器上,这导致在将多个分片合并成完整文件时出现问题,而单机情况下则完全不会有这样的问题。

二、难点

这个问题的解决方案有很多种,但是需要根据实际情况尽量选择简洁、易部署和维护的方案进行,并且不能丢掉分布式系统的优点。比如网上的有的方案是使用单独的文件上传服务器,但是这就变成了单机服务了。也有使用NFS挂载的方案,即所有的服务器挂载一个相同的NFS目录,所有上传相关的文件都存放在挂载的目录下,这不仅给运维带来了麻烦,为了保证NFS的高可用,也带来了额外的运维成本。

三、解决方案

3.1 借助负载均衡

借助负载均衡的方案很简单,这是和应用无关的一种方法。即通过负载均衡这一层,将同一个文件的不同分片的请求全部导向到同一个服务器上。比如负载均衡这一块使用的是nginx, 通过url hash的方式来完成,可以使用如下的配置:

upstream backend {
    server 0.0.0.0:8080;
    server 0.0.0.0:8081;
    server 0.0.0.0:8082;
    hash request_uri;
}
server {
    location / {
        proxy_pass http://backend;
        proxy_set_header X-Real-IPremote_addr;
        proxy_set_header Host $host;
    }
}

然后写一个简单的服务来验证一下:

func post(resp http.ResponseWriter, req *http.Request) {
    v := req.PostFormValue("key")
    identity := req.PostFormValue("identity")
    log.Printf("identity: %v, value: %v\n", identity, v)

    resp.WriteHeader(http.StatusOK)
    _, err := resp.Write([]byte("ok"))
    if err != nil {
        log.Println("error:", err)
    }
}

func main() {
    var addr string
    flag.StringVar(&addr, "addr", "0.0.0.0:8080", "http listen addr:port")
    flag.Parse()

    http.HandleFunc("/Post", post)
    log.Println("listen ", addr)
    err := http.ListenAndServe(addr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

我们启动三个服务:

./godemo -addr 0.0.0.0:8080
./godemo -addr 0.0.0.0:8081
./godemo -addr 0.0.0.0:8082

使用curl来post数据过来,请求的地址类似于: http://0.0.0.0/Post?123456?后面可以当成是文件的唯一标识码,比如文件和用户id的组合的md5信息等,这样对于同一个用户上传的同一个文件的不同分片请求,通过url哈希都会得到同样的结果,这样就会被转发到同一个后端服务器。

curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST

结果如下:

端口8080的服务收到了三条请求:
2019/09/26 13:58:52 identity: 123456, value: value
2019/09/26 13:58:56 identity: 123456, value: value
2019/09/26 13:59:03 identity: 123456, value: value

端口8081的服务收到了三条请求:
2019/09/26 13:59:40 identity: 789abc, value: vvvvv
2019/09/26 13:59:43 identity: 789abc, value: vvvvv
2019/09/26 13:59:44 identity: 789abc, value: vvvvv

如果在k8s集群中部署,使用nginx-ingress的话可以使用以下的部署方案:

apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
    name: pipeline-ingress
    namespace: default
    annotations:
      nginx.ingress.kubernetes.io/proxy-body-size: "50m"
      nginx.ingress.kubernetes.io/upstream-hash-by: "$request_uri"
spec:
    rules:
        - host: pipeline.dev.com
          http:
              paths:
                  - path: / 
                    backend:
                        serviceName: pipeline-service
                        servicePort: 8888

当然这种方式需要注意你的每个请求的URI都需要加上额外的参数(比如用户的userId的md5),这样才能均匀的分布到不同的服务器上。

3.2 后台程序自动proxy请求

这个方案的思路是集群中的每个服务实例都有单独的标识,文件分片在第一次上传时会返回给它一个该请求所属服务器的identity,之后所有的请求会带上这个identity,之后收到请求的服务器会检查这个identity是不是属于自己,如果不属于自己,则把这个请求转发给所属的服务器。

这个方案要求每台服务器都知道其他所有服务器的identity和地址。这里我们可以使用etcd这样的分布式数据库来存储。每台服务器在启动时都向etcd里注册自己的identity和address,之后服务器转发的时候都向etcd里面查找对应的address即可。

下面是一个示例的代码:

package main

import (
    "context"
    "encoding/json"
    "flag"
    "io/ioutil"
    "log"
    "net/http"
    "go.etcd.io/etcd/clientv3"
    "net/url"
    "time"
)

type Server struct {
    Etcd string
    Addr string
    Identify string
}

type ResponseObj struct {
    Identity string `json:"identity,omitempty"`
    Value string `json:"value"`
}

func getEtcdKV(etcd string) (clientv3.KV, error) {
    cfg := clientv3.Config{
        Endpoints:               []string{etcd},
        // set timeout per request to fail fast when the target endpoint is unavailable
        DialTimeout: time.Second,
    }

    cli, err := clientv3.New(cfg)

    if err != nil {
        return nil, err
    }

    return clientv3.NewKV(cli), nil
}

func httpProxy(anotherServer string, body map[string]string) (*http.Response, error) {
    formData := url.Values{}

    for k, v := range body {
        formData.Set(k ,v)
    }

    return http.PostForm(anotherServer, formData)
}

func (s *Server) Register() error {
    cli, err := getEtcdKV(s.Etcd)
    if err != nil {
        return err
    }

    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Second)
    _, err = cli.Put(ctx, s.Identify, "http://" + s.Addr)
    cancel()
    if err != nil {
        return err
    }

    return nil
}

func (s *Server) Post(resp http.ResponseWriter, req *http.Request) {
    v := req.PostFormValue("key")
    identity := req.PostFormValue("identity")
    log.Printf("identity: %v, value: %v\n", identity, v)

    var data ResponseObj

    if identity != "" && identity != s.Identify {
        // proxy
        cli, err := getEtcdKV(s.Etcd)
        if err != nil {
            log.Println("get kv client error", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        etcdResp, err := cli.Get(context.Background(), identity)
        if err != nil {
            log.Println("get value error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        if len(etcdResp.Kvs) == 0 {
            log.Println("没有值")
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        proxyAddr := string(etcdResp.Kvs[0].Value)

        log.Println("proxy to: ", proxyAddr)

        text := map[string]string {
            "key": v,
            "identity": identity,
        }

        proxyResp, err := httpProxy(proxyAddr+"/Post", text)
        if err != nil || proxyResp.Body == nil {
            log.Println("http proxy error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        bodyData, err := ioutil.ReadAll(proxyResp.Body)
        if err != nil {
            log.Println("read proxy body error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        err = json.Unmarshal(bodyData, &data)
        if err != nil {
            log.Println("json unmarshal error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }
    } else {
        data.Identity = s.Identify
        data.Value = v
    }

    text, err := json.Marshal(data)
    if err != nil {
        log.Println("marshal json error: ", err)
        resp.WriteHeader(http.StatusInternalServerError)
        return
    }

    resp.WriteHeader(http.StatusOK)
    _, err = resp.Write(text)
    if err != nil {
        log.Println("error:", err)
    }
}

func main() {
    var addr string
    var identity string
    var etcd string
    flag.StringVar(&addr, "addr", "0.0.0.0:8080", "http listen addr:port")
    flag.StringVar(&identity, "identity", "", "identify the server")
    flag.StringVar(&etcd, "etcd", "http://127.0.0.1:2379", "etcd server url")
    flag.Parse()

    if identity == "" {
        log.Fatal("identity不可为空")
    }

    var server = &Server{
        Addr:     addr,
        Identify: identity,
        Etcd: etcd,
    }

    err := server.Register()
    if err != nil {
        log.Fatal("register server error, check your etcd server: ", err)
    }

    http.HandleFunc("/Post", server.Post)
    log.Println("listen ", addr)
    err = http.ListenAndServe(addr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

3.3 基于S3存储的方案

兼容S3的存储系统有一个对外的接口叫做: ComposeObject。这个接口可以将多个文件合并成一个文件存储到指定位置。如果我们的底层存储是基于兼容S3的系统,那么就可以利用这个接口轻松的实现。

方案的步骤可以描述成以下:

  1. 前端实现将文件分片,然后上传
  2. 上传的请求会因为前端负载均衡的原因分布到不同的服务器,每个上传请求都要带上这个文件、用户id、随机字符串组合的md5值,服务器收到请求后,只管将分片上传到以md5值为名的目录下,一旦上传成功,使用etcd或redis这样作为分片计数器+1.
  3. 分片总数等于总分片数时调用ComposeObject组合所有分片存储到指定位置即可。

ceph架构研究

这篇文章的绝大多数内容都是官网原文的翻译:Ceph Architecture

ceph是什么

ceph是一个分布式的文件存储系统,它提供了以下三种存储能力:

  • 块设备存储(block device)
  • 文件系统存储(filesystem)
  • 对象存储(object storage)

因此如果你需要多种存储方案的话,ceph是一个非常好的选择。这几种存储方案的差别是什么呢?

块设备存储是将底层的存储能力以逻辑硬盘的方式暴露给主机使用。主机只感知到单块物理硬盘的挂载,底层的复杂机制都被屏蔽

对象存储是将文件抽象成对象,然后通过网络接口(比如s3的api)来对文件进行操作(put,get,delete等)。

文件系统存储是像FTP,NFS这种,可以将ceph的存储能力当成普通的文件系统来使用,也支持cd, ls这样的文件系统命令。

ceph filesystem

ceph 架构一览

ceph arch

通过上图可以看出,ceph的底层核心是RADOS,然后通过LIBRADOSCEPHFS以及基于LIBRADOS的RADOSGW, RBD对外提供存储功能。

ceph 存储集群

ceph基于RADOS提供了无限的可扩展的存储集群。关于RADOS的知识可以看这篇论文: RADOS – A Scalable, Reliable Storage Service for Petabyte-scale Storage Clusters

一个Ceph存储集群包括两类常驻服务:

  • Ceph Monitor
  • Ceph OSD Daemon

ceph daemons

Ceph Monitor维护了一个关于集群信息映射的主副本。Ceph monitors集群保证了在一个monitor服务停止时的高可用性。存储集群客户端从Ceph Monitor获取一份集群信息映射的拷贝。

Ceph OSD服务负责检查自身的状态以及其他OSD的状态,然后汇报给monitors。

存储集群客户端和每一个Ceph OSD服务使用CRUSH算法来高效的计算数据位置的信息,而不是依赖于一个中心化的查找表。librados是一个很重要的角色,它提供了Ceph的高层特性,一系列的服务接口都是建设在librados上层的。

存储数据

Ceph存储集群从Ceph客户端接受数据,这里的客户端可以是Ceph块设备存储,对象存储,文件系统或者是基于librados的自定义实现。librados将数据作为一个对象存储。每一个对象都在文件系统中对应一个文件,这些文件存储在对象存储设备上。Ceph OSD服务在存储磁盘上处理这些读写操作。

osd handle read/write operations

Ceph OSD服务在一个扁平的命名空间中(没有目录结构)存储所有的数据,每个数据都作为一个对象。一个对象有唯一标识符,二进制数据,以及有一系列键值对的元数据。这完全取决于Ceph客户端的实现。举例来说,CephFS使用元数据来存储文件属性,比如文件拥有者,创建日期,最后修改日期等等。

ceph object

注意: 对象ID是在整个集群中唯一的,而不仅仅是本地文件系统。

扩展性和高可用

在传统架构中,客户端和一个中心化的组件(比如gateway, broker, API, facade等)通信。这个中心化组件在一个复杂系统中扮演了一个单点的入口。这在性能和扩展性上都导致了局限性————单点失败。(比如,如果中心组件宕机,整个集群都不可用)。

Ceph消灭了这个中心化的入口,让客户端直接和Ceph OSD服务通信。Ceph OSD服务在其他Ceph节点上创建对象副本来保证数据安全和高可用。Ceph同样使用一个monitor的集群来保证高可用。为了消灭中心化,Ceph使用了一个叫做CRUSH的算法。

CRUSH介绍

Ceph客户端和Ceph OSD服务都使用CRUSH算法来高效的计算对象存储位置信息,而不是必须取决于一个中心化的查找表。相比于之前的方案,CRUSH提供了一个更好的数据管理机制,通过清晰的将工作分布到集群中的所有客户端和OSD服务,可以达到很好的扩展性。CRUSH使用智能的数据复制来保证灵活性,这更适用于超大规模的存储。

集群映射表

Ceph依赖于Ceph客户端和OSD服务,它们包含了5个映射表。

  • 监控映射表: 包含集群的fsid, 位置, 名字地址以及每个monitor的端口。它同样记录了当前的代数,映射表的创建时间,最后一次修改时间。可以使用ceph mon dump来查看一个监控表

  • OSD映射表: 包含集群的fsid, 这个映射表的创建时间和最后修改时间,pools的列表,副本数量,PG数量,OSD的列表以及他们的状态。使用ceph osd dump来查看osd表。

  • PG映射表: 包含PG版本号, 时间戳,最新的OSD映射表的代数,全比率,每一个放置组的详情,比如PG ID, Up Set, Acting Set,以及PG的状态(比如active + clean),每一个pool的数据使用统计信息。

  • CRUSH映射表: 包含存储设备的列表,失败域的层级(比如device,host,rack,row,room等),存储数据时的遍历层级的规则。

  • MDS映射表: 包含当前的MDS映射表的代数, 映射表的创建时间,最后修改时间。它同样包含存储元数据的池,元数据服务器的列表,那些元数据服务器是up和in的。执行ceph fs dump来查看MDS映射表。

每一个映射表都包含它的操作状态改变的可迭代历史。Ceph Monitors维护一个集群映射表的主拷贝,包含了集群成员,状态,改变,以及Ceph存储集群的全局健康状态。

高可用的监控服务

在Ceph客户端可以读或写数据前,它们都必须和一个Ceph监控服务通信来获取最新的集群映射表的拷贝。一个Ceph存储集群可以只有一个Monitor角色。当然,这会导致单点失败。

为了增加可靠性和错误容忍,Ceph支持监控服务的集群。在一个监控服务的集群中,延迟和其他错误会导致一个或多个监控服务落后于集群的当前状态。因此,Ceph必须在各个监控服务实例之间就集群状态达成一致。Ceph总是相信大多数的监控服务。Paxos算法被用来就当前集群状态达成共识。

高可用的认证

为了鉴别用户以及防止中间人攻击,Ceph提供了cephx认证系统来认证用户和后台服务。

cephx协议不致力于传输过程中数据加密(比如SSL/TLS)或者是其他的加密。

Cephx使用共享的秘钥来认证,这意味着客户端和监控集群都有一份客户端秘钥的拷贝。认证协议需要双方都能证明给对方自己拥有这个秘钥的拷贝,但是又不能把秘钥原文说出来。这提供了两端的验证,意味着集群确信用户有这个秘钥,用户也确信集群有这个秘钥。

Ceph的一个关键的可扩展的特性是避免对Ceph object store的实现有中心化接口,这意味着Ceph客户端必须能够和OSD直接通信。为了保护数据,Ceph提供了cephx认证系统。cephx协议的机制和Kerberos类似。

用户调用Ceph客户端来和监控服务器通信。和Kerberos不同的是,每一个监控服务都可以验证用户以及分发私钥,所以在使用cephx时不再有单点失败或瓶颈了。监控服务返回一个认证数据结构,类似于Kerberos ticket,包含获取Ceph服务的会话秘钥。会话秘钥被用户永久秘钥加密过,因此只有用户可以向Ceph监控服务请求服务。之后客户端使用会话秘钥来请求它想要的服务,监控服务向客户端提供一个ticket,这个ticket可以向OSD认证客户端来处理数据。
Ceph Monitors和OSD共享秘钥,因此客户端可以使用监控服务提供的ticket和任意的OSD或元数据服务器通信。和Kerberos一样,cephx ticket会过期,因此攻击者不能使用过期的ticket或会话秘钥来获取服务。

为了使用cephx,管理员必须首先设置用户。在下图中,client.admin这个用户从命令行中调用ceph auth get-or-create-key来生成用户名和秘钥。Ceph的认证系统生成用户名和秘钥,存储一份拷贝给所有的监控服务,然后将用户秘钥传送给client.admin用户。这意味着客户端和监控服务共享同一个秘钥。

request create user

为了通过监控服务的认证,客户端把用户名发送给监控服务,监控服务生成一个会话钥匙,使用秘钥附加上用户名来加密。之后,监控服务把加密后的ticket返回给客户端。客户端使用共享的秘钥解密,获取到会话钥匙。会话钥匙为当前的会话提供认证。客户端之后请求由这个会话钥匙签名的ticket。监控服务生成ticket,使用用户秘钥加密并返回给客户端。客户端解密ticket,使用它来签名给OSD和元数据服务器的请求。

client to osd

cephx协议认证每个在客户端机器和Ceph服务器之间的会话。每一个在客户端和服务器之间发送的信息,都会接连通过初始化认证,使用ticket签名,这个签名可以由监控服务,OSD和元数据服务器用他们共享的秘钥来验证。

complete process

由这种认证提供的保护是在Ceph客户端和Ceph服务端之间的。在Ceph客户端之外这个认证是不管用的。如果用户通过远程连接到Ceph客户端上,Ceph的认证不会应用到这个远程连接上。

智能的后台服务造就了超大规模的可扩展性

在很多集群架构中,集群成员的主要目的是通过一个中心化的接口知道哪些节点可以访问。然后中心化的接口通过二次转发来提供服务,这回导致在pb级别向eb级别扩展时有很大的瓶颈。

Ceph消灭了这个瓶颈:Ceph的OSD服务和Ceph客户端是集群感知的。比如Ceph客户端、Ceph OSD服务都知道集群中的其他Ceph OSD服务。这使得Ceph OSD服务可以直接和其他Ceph OSD服务以及Ceph监控服务通信。另外,它使得Ceph客户端可以直接和Ceph OSD服务直接通信。

这种方法也最大化的利用了节点的CPU和RAM, 带来了以下几个主要的好处:

  • OSD 为客户端直接提供服务:因为任何网络设备都有最大并发连接的瓶颈,一个中心化的系统在高扩展的情况下会出现一个很低的物理瓶颈。通过让Ceph客户端直接和Ceph OSD服务通信, Ceph同时增加了性能和系统容量,并且解决了单点失败的问题。Ceph客户端可以维护一个和Ceph OSD服务的会话,而不是一个中心化的系统,只要它们需要这样做。

  • OSD成员和状态:Ceph OSD服务加入到集群中并且汇报它们的状态。在最低级别上,Ceph OSD服务状态是up或者down的,代表着它是否能够处理Ceph客户端的请求。如果Ceph OSD服务是down的,这意味着Ceph OSD服务的异常。如果Ceph OSD服务不在运行(crash了),Ceph OSD服务就不能通知Ceph监控服务它停止运行了。OSD服务周期性的给Ceph监控服务发送消息,如果Ceph监控服务在约定的周期内没有收到OSD服务的消息,它就把这个OSD服务标记为down。这种机制叫做failsafe(failsafe可以理解可以故障的自动保护装置,当OSD出现故障会自动将其排除从而避免引发更大的故障)。当然,OSD服务也会查看它相邻的OSD服务是否停止,然后汇报给Ceph监控服务。这保证了Ceph监控服务是一个轻量级的进程(脏活累活都被OSD服务做了)。

  • 数据清理:作为维护数据一致性和清洁度的一部分,Ceph OSD服务可以在PG(placement group, 放置组)中清理对象。Ceph OSD服务可以将一个PG中的对象和存储在另一个OSD服务中的副本进行元数据的对比。清理操作(通常是每天)会找出bug或文件系统的错误。Ceph OSD服务同样会执行更深层次的清理,通过对对象执行按位的对比。深层次的清理(通常是每周)会找到硬盘上坏的扇区。

  • 数据复制:像Ceph客户端一样,Ceph OSD服务使用CRUSH算法,但是Ceph OSD服务使用它来计算对象的副本应该存储在哪里(或者是重新负载均衡)。在一个典型的写操作场景下,一个客户端使用CRUSH算法来计算对象存储在哪里,将这个对象映射到pool和PG中,然后查看CRUSH映射表来鉴别PG组的主OSD服务

    客户端将对象写入到主OSD服务的PG组中。之后,主OSD携带着他自己的CRUSH映射表的拷贝存储到第二和第三个OSD服务中,这是为了多副本的备份。一旦它确认数据存储成功就返回给客户端。

osd write for replication

因为有这样的数据复制的能力,Ceph OSD服务使得Ceph客户端不需要负责这件事,同时也能保证高的数据可用性和数据安全。

动态集群管理

在高可用和可扩展性章节,我们解释了Ceph如何使用CRUSH、集群感知、智能服务扩展以及维护高可用。Ceph的关键设计就是自治、自我恢复、智能Ceph OSD服务。下面我们更深入的探究一下CRUSH如何在现代化的云存储架构中发挥作用,包括存储数据、集群的重新负载均衡以及动态的从错误中恢复。

关于pool

Ceph存储系统支持一个叫做pool的概念,这是存储对象的逻辑分区。

Ceph客户端从Ceph监控服务中获取集群映射表,然后向pool中写入对象。pool的大小或副本数量、CRUSH的规则以及PG的数量决定了Ceph如何存储数据。

how crush work

pool至少会设置以下参数:

  • 对象的拥有者/访问权限
  • PG的数量
  • 使用的CRUSH规则

映射PG到OSD

每一个pool都有一系列的PG。CRUSH动态的将PG映射到OSD中。当Ceph客户端存储对象,CRUSH将会映射每一个对象到一个PG中。

将对象映射到PG创建了一个中间层,这个中间层位于Ceph OSD服务和Ceph客户端中间。在动态的存储对象时,Ceph存储系统必须能够增长(或收缩)以及重新负载均衡。如果Ceph客户端知道哪个Ceph OSD服务拥有哪一个对象,这就导致Ceph客户端和Ceph OSD服务紧耦合了。相反的,CRUSH算法将每一个对象映射到PG,然后将每一个PG映射到一个或多个OSD服务。这个中间层允许在有新的OSD服务以及新的底层OSD设备上线时,Ceph可以动态重新负载均衡。下面的图表演示了CRUSH如何映射对象到PG,然后映射PG到OSD。

crush mapping

客户端在有集群映射表的拷贝和CRUSH算法的情况下,可以准确的计算出在读取或写入对象时使用哪一个OSD。

计算PG ID

当一个Ceph客户端绑定到一个Ceph监控服务时,它会获取集群映射表的最新拷贝。有了这个集群映射表,客户端知道所有的监控服务,OSD,元数据服务。然而,它并不知道任何关于对象的位置信息。

对象位置需要计算

客户端唯一需要的输入是对象ID和pool。这很简单:Ceph在命名的pool中存储数据。当客户端存储一个命名的对象时,它使用对象名、一个哈希码、pool中PG的数量和pool名来计算PG。Ceph客户端使用以下步骤来计算PG ID。

  • 1.客户端输入pool名和对象ID(比如: pool=”liverpool”, object-id=”john”)

  • 2.Ceph获取对象ID,然后做一下哈希操作

  • 3.Ceph计算哈希值除PG数量的余数,(比如58)得到了PG ID

  • 4.Ceph获取给定的pool名的pool ID(比如”liverpool”=4)

  • 5.Ceph将pool ID放在PG ID之前(比如4.58)

在一个忙碌的会话中,计算对象位置要比执行对象位置查询快很多。CRUSH算法允许客户端计算对象应该存在哪里,这使得客户端可以直接和主OSD服务通信,存储或获取对象。

peering and sets

在之前的章节中,我们提到Ceph OSD服务检查彼此的心跳,然后汇报给Ceph监控服务。Ceph OSD做的另外一件事叫做”窥探(peering)”,这会让所有存储PG的OSD对PG中的对象状态达成一致。事实上,Ceph OSD服务也会汇报”peering”的失败给Ceph的监控服务。Peering的问题通常会自我解决。

对状态达成一致并不意味着PG有最新的内容

Ceph存储集群被设置成一个对象至少有两个副本(比如, size=2),这是数据安全的最小要求。为了高可用,Ceph存储集群应该存储多于两个的副本(比如size=3并且最小size=2)。这样它就能在维护数据安全的时候以一个降级状态继续运行。

重新负载均衡

当你添加一个Ceph OSD服务到集群中,集群映射表会随之更新。根据之前的计算PG ID的方法,这会改变集群映射表。于是,它改变了对象的位置,因为它改变了计算的一个输入参数。下面的图标描述了重新负载均衡的过程。有一些但不是所有的PG会从已存在的OSD(OSD1和OSD2)中迁移到新的OSD(OSD3)中(这个过程虽然看似很简单粗暴,但其实对大型系统影响很小)。即使在重新负载均衡的过程中,CRUSH仍然是稳定可靠的。大多数PG保留它们原有的配置,每一个OSD获取到更多的可用容量,所以在重新负载均衡之后新的OSD没有负载峰值。

ceph rebalance

数据一致性

作为维护数据一致性和清洁度的一部分,Ceph OSD同样可以在PG中清理对象。关于OSD如何维持各个副本的对象一致性,可以参考上面的数据清理。

纠删码(erasure coding)

一个纠删码pool将每个对象分成K+M个块。数据被分为K个数据块和M个编码块。这个pool被配置成K+M的大小,因此每个块都可以存在一个OSD上。块的排序作为对象的属性存储起来。

举例来说,一个使用5个OSD(K+M=5)的纠删码pool可以容忍2块数据的丢失(M=2)。

读写编码块

当一个叫做NYAN的对象包含”ABCDEFGHI”的内容被写入到pool中,擦除编码功能将内容分成3个数据块:第一部分是ABC,第二部分是DEF,第三部分是GHI。如果内容长度不是K的整数倍,会被填充内容。这个功能同样创建两个编码块:第4个是YXY,第5个是QGC。每一个块都被存储到一个OSD中。这些块以同样的名字(NYAN)以对象的形式存储在不同的OSD上,块的创建顺序必须保留,存储对象的属性中(shard_t),作为名字的额外补充。块1包含ABC,存储在OSD5中,块4包含YXY存储在OSD3中。

erasure conding

当对象NYAN从纠删码pool中读取时,解码功能读取三个块: 块1包含ABC,块3包含GHI,块4包含YXY。然后它重建了对象的原始内容ABCDEFGHI。这个解码功能被通知块2和块5是缺失的(它们被称作’擦除’)。块5不会被读取是因为OSD4从集群中退出了。一旦3个块被读取解码功能就会被调用,而此时OSD2因为最慢所以根本没有被考虑进来。

erasure decode

中断完整的写入

在一个纠删码pool中,主OSD接受所有的写操作。它负责编码内容为K+M块,然后将它们发送给其他的OSD。它同样负责维护PG日志的版本号。

在下面的图表中,一个纠删码的PG被创建成K=2 M=1,它支持3个OSD,两个存储K,一个存储M,分别称为OSD1、OSD2、OSD3。一个对象被编码以及存储在OSD中,块D1v1(数据块序号1,版本1)在OSD1上,块D2v1在OSD2上,C1v1(编码块序号1,版本1)在OSD3上。每一个OSD上的PG日志都是唯一的(1,1 是epoch 1. version 1)

interrupted-fill-writes

OSD1是主服务,接受客户端的完整的写入,这意味着写入的数据要完整的替换对象而不是覆盖一部分。版本2(v2)的对象被创建来覆盖版本1(v1)。OSD1将数据编码到3块: D1v2在OSD1上,D2v2在OSD2上,C1v2在OSD3上。每一个块都被发送到目标OSD上。当OSD接受到消息要写入数据块时,它同样创建一个新的PG日志来反映这个改变。举例来说,只要OSD3存储了C1V2,它添加1,2到日志中。因为OSD是异步工作的,当其他块已经存储好了(比如C1v1和D1v1)一些数据块可能还在处理(比如D2v2)。

write replace

如果一切顺利,数据块都会被存储,日志中的last_complete指针会从1,1移动到1,2

full write success

最后,之前的版本都可以被移除了: OSD1上的D1v1,OSD2上的D2v1, OSD3上的C1v1

remove previous version

但是如果发生了异常,OSD1在D2v2仍然写入的时候停止了,对象的版本2只是部分写入:OSD3有一个块但是不足以恢复。它丢失了两个块: D1v2和D2v2,但是纠删码参数K=2,M=1要求至少2个块是可用的,才能恢复第三个块。OSD4成为新的主服务,找到last_complete日志是1,1,这应该是最新的日志了。

osd4 online

日志1,2被发现在OSD3上,这和在OSD4上存储的最新版本1,1不一样、因此1,2被取消,C1v2块被移除。D1v1块被解码功能重建出来存在OSD4上。

rebuild-on-osd4

缓存分层

缓存层提供给Ceph客户端更好的IO性能,因为一部分数据是存储在缓存中的。缓存层包括创建一个快速/昂贵的存储设备的pool,被配置成缓存层,便宜的存储设备被当成存储层使用。Ceph objecter负责处理将对象放在哪里,tiering代理决定什么时候将对象从缓存中写入到后面的存储层。因此缓存层和背后的存储层对Ceph客户端是完全透明的。

cache tiering

ceph协议

Ceph客户端使用本地的协议来和Ceph存储集群通信。Ceph将它的功能打包到librados库中,因此你可以创建自己的Ceph客户端。下面的图表描述了这种基础架构。

basic architecture

本地协议和librados

现在的应用程序需要一个支持异步通信的简单的对象存储接口。Ceph存储集群提供了这样的一个功能。接口提供了直接的,并行的对象访问功能。

  • Pool操作
  • 快照和写时复制克隆
  • 读写对象-创建和移除-整个对象或字节范围-追加或者截短
  • 创建/设置/获取/移除 XATTRS
  • 创建/设置/获取/移除 键值对
  • 复合操作和二次ack语义
  • 对象类

对象监视和通知

客户端可以注册一个对对象的持久关注,保持一个和主OSD的会话打开。客户端可以发送通知消息和一些负载信息给所有的监视者,然后所有监视客户端都会收到通知。这使得客户端可以使用任何对象作为同步通信通道。

watch and notify

数据分片

存储设备都会有吞吐量的限制,这对性能和可扩展性都有很大的影响。大多数存储系统都支持在多个存储设备上分别存储数据的一部分片段,这会增加吞吐量和性能。最常见的就是RAID了,Ceph的分片和RAID 0类似。

Ceph提供了三类客户端: Ceph块设备、Ceph文件系统、Ceph对象存储。Ceph客户端负责转换数据给用户(一个块设备镜像,RESTful对象,CephFS文件系统目录)。

提示: Ceph存储的对象不会被分片。Ceph对象存储,Ceph块设备,Ceph文件系统将它们的数据分片存储成多个Ceph存储系统对象。Ceph客户端通过librados写入数据必须执行分片(以及并行io)才能获取分片的优点。

最简单的Ceph分片格式包含一个对象的分片。Ceph客户端向存储对象写入分片单元,直到对象达到了它的最大容量,然后创建一个新的对象来存储多出来的数据分片。这种最简单的方式对于小的块设备镜像、s3或swift对象以及CephFS文件是足够的。当然,这种简单的方式没有最大化利用Ceph分布式存储数据的优点,串行的方式也没有提升性能。下面的图标描述了这种形式的分片:

simplest strip

如果你想要更大的镜像尺寸,更大的S3或Swift对象,或者更大的CephFS目录,你应该考虑通过将数据分片分布到一个对象集合中的多个对象上来提升读写性能。并行操作会显著的提升写入性能。因为对象会被映射到不同的PG中,然后被映射到不同的OSD中,每一个写入操作都会以最大的写入速度来并行操作。单硬盘的写入会因为柱头的移动和设备的最大带宽(100M/s)而受到限制(比如每次寻道都要6ms)。 通过将写入分布到多个对象上,Ceph可以减少每个硬盘的寻道时间,然后将多个硬盘的吞吐量结合以达到更快的写入(或读取)速度。

注意:分片是独立于对象复制的。因为CRUSH在OSD中复制对象,分片也会自动被复制。

在下图中,客户端数据被分片到一个对象集合中(对象集合1),这个集合包含4个对象,第一个分片单元是位于object 0 的 strip unit0,第4个分片单元是位于object3的strip unit3。当写如第4个分片后,客户端判断对象集合是否满了。如果对象集合没有满,客户端会从第一个对象继续写入分片。如果对象集合满了,客户端重新创建一个对象集合(object set2)。然后开始写入第一个分片(strip unit16)。

strip to multiple object

下面是三个重要的变量决定了Ceph如何对数据分片:

  • 对象大小: Ceph存储系统中的对象有一个配置的最大容量(比如2MB, 4MB)。这个对象大小应该足够大,以容纳很多个分片单元。

  • 分片宽度:分片有一个配置的单元大小(比如64kb)。Ceph客户端分割数据成同样大小的分片单元,除了最后一个。分片宽度应该是对象大小的一小部分,这样一个对象就可以包含多个分片单元了。

  • 分片数量:Ceph客户端在指定分片数量的对象上写入一连串的分片单元。这些指定数量的对象叫做一个对象集合。在Ceph客户端写完对象集合中的最后一个对象后,它又开始从对象集合中的第一个对象开始写入。

    重要:在投入到生产环境之前应该测试分片配置的性能。写入数据之后就不能修改这些配置了。

    一旦Ceph客户端将数据分片,然后将分片单元映射到对象中,Ceph的CRUSH算法将对象映射到PG中,然后PG被映射到Ceph OSD服务中。

    Ceph客户端

    Ceph客户端包含一系列的服务接口,包括:

    • 块设备:块设备(RBD)服务提供可变大小的、精简配置的块设备,块设备支持快照和复制。Ceph在整个集群中将块设备分片来提高性能。Ceph支持内核对象(KO)和QEMU的管理程序直接使用librbd——避免虚拟系统的内核对象过载。

    • 对象存储:对象存储(RGW)服务提供RESTful API,和Amazon S3以及OpenStack Swift兼容。

    • 文件系统:文件系统(CephFS)服务提供了POSIX兼容的文件系统,可以使用mount这样的命令,或者作为一个用户空间的文件系统。

Ceph可以运行OSD, MDS, 监控服务的额外实例,这些会提高可扩展性和高可用性。下面的图从高层架构上描述了这个情况:

high level architecture

Ceph对象存储

Ceph对象存储服务叫做radosgw,是一个FastCGi程序,提供了RESTful HTTP API来存储对象和元数据。它是在Ceph存储集群的顶层,拥有自己的数据格式,维护自己的用户数据库,认证和访问控制。RADOS入口使用统一的命名空间,这意味着你既可以使用OpenStack Swift兼容的API,也可以使用Amazon S3兼容的API。举例来说,你可以使用S3的API写入数据,使用Swift的API读取数据。

Ceph块设备

Ceph块设备将一个块设备镜像分片到多个Ceph存储集群的对象上,每一个对象都会映射到不同的PG,PG也会分布到不同的ceph-osd服务上。

精简配置的,可生成快照的Ceph块设备是虚拟化和云计算的吸引点。在虚拟机的场景下,人们可以在QEMU/KVM中使用rbd网络存储驱动来部署ceph块设备,主机使用librbd来给客户提供块设备服务。需要云计算栈使用libvirt来集成管理程序。你可以在QUME中使用精简配置的Ceph块设备,使用libvirt来支持OpenStack、CloudStack或其他解决方案。

因为我们目前不提供librbd支持其他的管理程序,你也同样可以使用Ceph块设备内核对象来提供一个块设备给一个客户端。其他的虚拟化技术,比如Xen可以访问Ceph块设备的内核对象。这可以使用命令行工具rbd完成。

Ceph文件系统

Ceph文件系统(CephFS)提供了POSIX兼容的文件系统,处于基于对象的Ceph存储集群之上。

ceph filesystem

Ceph文件系统服务包括Ceph元数据服务(MDS)。MDS的目的是存储所有的文件系统元数据(目录,文件拥有者,访问模式等等),元数据都是存储在内存中的。原因是MDS是的一般操作(ls, cd)会对Ceph OSD服务造成巨大的压力。因此将元数据和数据本身分离开是提供高性能服务的必要条件。

一些不错的文章

https://zhuanlan.zhihu.com/p/58888246

cfssl生成证书并部署

安装

cfssl是基于go的,因此需要安装go

go get -u github.com/cloudflare/cfssl/cmd/cfssl
go get -u github.com/cloudflare/cfssl/cmd/cfssljson

安装完成后如果不能直接使用cfssl请检查$GOPATH/bin是否加入到PATH环境变量中。

执行cfssl查看一下cfssl提供的命令

Usage:
Available commands:
        sign
        serve
        gencsr
        ocspsign
        ocspserve
        revoke
        certinfo
        version
        crl
        gencert
        ocsprefresh
        scan
        info
        print-defaults
        bundle
        genkey
        gencrl
        ocspdump
        selfsign
Top-level flags:

创建CA

CA的全称是Certificate Authority,也叫证书授权中心。CA的作用是作为一个权威的、被信任的第三方机构,提供管理和签发证书。在使用https访问一个网站时,为了证明这个网站是可信任的,那么就需要使用CA颁发的证书来证明自己。

因为在内网环境中搭建docker-registry,必然不可能用到互联网上的CA机构,因此我们需要自己扮演这个角色。首先创建文件夹

mkdir ca
cfssl print-defaults config > ca/ca-config.json
cfssl print-defaults csr > ca/ca-csr.json

然后修改ca-config.json

{
    "signing": {
        "default": {
            "expiry": "2540400h"
        },
        "profiles": {
            "www": {
                "expiry": "2540400h",
                "usages": [
                    "signing",
                    "key encipherment",
                    "server auth"
                ]
            },
            "client": {
                "expiry": "2540400h",
                "usages": [
                    "signing",
                    "key encipherment",
                    "client auth"
                ]
            }
        }
    }
}

生成ca

cfssl gencert -initca ca/ca-csr.json | cfssljson -bare ca/ca -

生成服务器证书

mkdir server
cfssl print-defaults csr > server/server.json

修改server.json

{
    "CN": "docker-registry",
    "hosts": [
        "docker-registry.k8s",
        "www.docker-registry.k8s"
    ],
    "key": {
        "algo": "ecdsa",
        "size": 256
    },
    "names": [
        {
            "C": "CN",
            "ST": "SH",
            "L": "Shanghai"
        }
    ]
}

签发证书

cfssl gencert -ca=ca/ca.pem -ca-key=ca/ca-key.pem -config=ca/ca-config.json -profile=www server/server.json | cfssljson -bare server/server

使用

上面两步得到了ca.pem, server-key.pem, server.pem。ca.pem是ca的证书,server-key.pem是服务器证书的私钥,server.pem是服务器证书

为了在k8s中使用,我们需要先创建默认的tls secret

kubectl -n docker-registry create secret tls docker-registry-tls-cert --key=server/server-key.pem --cert=server/server.pem

为电脑导入ca证书

sudo mkdir /usr/share/ca-certificates/extra
sudo cp ca.pem /usr/share/ca-certificates/extra/ca.crt

运行命令更新证书

sudo dpkg-reconfigure ca-certificates

然后使用curl查看ca根证书是否安装成功

curl -v https://docker-registry.k8s

显示一下内容表示成功

* TLSv1.3 (OUT), TLS handshake, Client hello (1):
* TLSv1.3 (IN), TLS handshake, Server hello (2):
* TLSv1.2 (IN), TLS handshake, Certificate (11):
* TLSv1.2 (IN), TLS handshake, Server key exchange (12):
* TLSv1.2 (IN), TLS handshake, Server finished (14):
* TLSv1.2 (OUT), TLS handshake, Client key exchange (16):
* TLSv1.2 (OUT), TLS change cipher, Change cipher spec (1):
* TLSv1.2 (OUT), TLS handshake, Finished (20):
* TLSv1.2 (IN), TLS handshake, Finished (20):
* SSL connection using TLSv1.2 / ECDHE-ECDSA-AES256-GCM-SHA384
* ALPN, server accepted to use http/1.1
* Server certificate:
*  subject: C=CN; ST=SH; L=SH; CN=DR
*  start date: Jun 26 06:37:00 2019 GMT
*  expire date: Apr 17 06:37:00 2309 GMT
*  subjectAltName: host "docker-registry.k8s" matched cert's "docker-registry.k8s"
*  issuer: C=US; ST=CA; L=San Francisco; CN=example.net
*  SSL certificate verify ok.
> GET / HTTP/1.1
> Host: docker-registry.k8s
> User-Agent: curl/7.64.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: nginx/1.15.9
< Date: Wed, 26 Jun 2019 11:40:04 GMT
< Content-Length: 0
< Connection: keep-alive
< Cache-Control: no-cache
< 
* Connection #0 to host docker-registry.k8s left intact

这里有一点需要注意的,我的电脑上安装了anaconda的环境,因此curl是在anaconda下的curl,它会默认加载/home/username/anaconda3/certs/cacert.pem这个。因此我需要将/etc/ssl/certs/ca-certificates.crt覆盖掉这个文件才能默认加载。或者也可以使用--cacert指定根目录文件的位置。

或者可以参考这篇详细的说明: https://jite.eu/2019/2/6/ca-with-cfssl/#

chrome下的相关设置

即使在我将根证书设置好并且验证成功,但是chrome仍然会有不安全的标识,这是因为需要为chrome单独导入根证书,设置的路径为: 设置 -> 高级 -> HTTPS相关设置。

go处理多态的JSON

最近使用go在对h2o的REST API进行封装时,发现了h2o的JSON返回值中有Polymorphic类型的字段。Polymorphic可以翻译为多态。一个多态的类型可以理解为既可能是float型,也可能是string类型等等。

在我的理解中,一个JSON中每个字段的类型都必须是确定的,在动态语言比如php, js这种,处理这种不确定的类型很方便。但是在go这种静态语言中,一个不确定的类型导致在解析中总是会出现类似于这样子的错误: json: cannot unmarshal string into Go struct field Foo.Value of type int

使用interface{}处理多态的问题

在go中,interface{}是一个空接口,那么所有类型都可以看做实现了这个空接口,因此interface{}可以接收任何类型的值。那么如果一个json既可能是{"mean": 123}, 又可能是{"mean": "NaN"},就可以使用以下结构体:

type Prediction struct {
    Mean interface{} `json:"mean"`
}

但是在使用Mean这个变量的时候,就比较麻烦了。

t2 := `{"mean": "NaN"}`

var p2 Prediction
err = json.Unmarshal([]byte(t2), &p2)

if err != nil {
    log.Println(err)
} else {
    if mean, ok := p2.Mean.(string); ok {
        log.Printf("p2 mean: %s \n", mean)
    } else if n, ok := p2.Mean.(int); ok {
        mean = strconv.Itoa(n)
        log.Printf("p2 mean: %s \n", mean)
    }
}

实现Unmarshaler和Marshaler接口

type Marshaler interface {
        MarshalJSON() ([]byte, error)
}

type Unmarshaler interface {
        UnmarshalJSON([]byte) error
}

在使用json.Unmarshaljson.Marshal时,会自动调用对应变量类型的UnmarshalJSONMarshalJSON方法。这样就可以定义一个FlexString类型

type FlexString string
type Prediction struct {
    Mean FlexString `json:"mean"`
}

然后实现*FlexString的UnmarshalJSON方法和FlexString的MarshalJSON方法。

func (fs *FlexString) UnmarshalJSON(b []byte) error {
    if b[0] == '"' {
        return json.Unmarshal(b, (*string)(fs))
    }
    var n float64
    if err := json.Unmarshal(b, &n); err != nil {
        return err
    }

    s := strconv.FormatFloat(n, 'g', 15, 64)

    *fs = FlexString(s)
    return nil
}

func (fs FlexString) MarshalJSON() ([]byte, error) {
    s := string(fs)
    n, err := strconv.ParseFloat(s, 64)
    if err != nil {
        return json.Marshal(s)
    }

    if math.IsNaN(n) {
        return json.Marshal(s)
    }

    return json.Marshal(n)
}

这样在使用的时候,就可以正常的对多态的Mean进行json解码以及编码了。

func main() {
    t1 := `{"mean": 123.1212}`
    var p1 Prediction
    err := json.Unmarshal([]byte(t1), &p1)
    if err != nil {
        log.Println(err)
    } else {
        log.Println("p1 mean value:", p1.Mean)
    }

    res1, err := json.Marshal(p1)
    if err != nil {
        log.Println("res1 error:", err)
    } else {
        log.Println("res1 :", string(res1))
    }

    t2 := `{"mean": "NaN"}`
    var p2 Prediction
    err = json.Unmarshal([]byte(t2), &p2)
    if err != nil {
        log.Println(err)
    } else {
        log.Println("p2 mean value:", p2.Mean)
    }

    res2, err := json.Marshal(p2)
    if err != nil {
        log.Println("res2 error:", err)
    } else {
        log.Println("res2 :", string(res2))
    }
}

运行结果如下:

2019/07/09 16:38:42 p1 mean value: 123.1212
2019/07/09 16:38:42 res1 : {"mean":123.1212}
2019/07/09 16:38:42 p2 mean value: NaN
2019/07/09 16:38:42 res2 : {"mean":"NaN"}

123.1212在解码再编码之后,仍然是float类型,NaN仍然是string类型。这里有一个注意事项,就是n, err := strconv.ParseFloat(s, 64)这个方法,如果s是NaN的字符串并不会出错,而是将n变成一个NaN,因此需要使用math.IsNaN进行检查。

基于phpx的php 扩展调试

用c写php的扩展不是一件轻松的事情,调试起来更是麻烦。普通的php扩展调试网上可以找到资料。我这里因为要用到c++,因此用了phpx来降低php扩展开发的门槛。

phpx项目地址:https://github.com/swoole/phpx

准备环境

ubuntu16.04, gcc, git, wget

首先从php的官方网站下载需要的php版本的源代码: https://php.net/releases/

我下载的是php-7.0版本的

wget https://www.php.net/distributions/php-7.0.33.tar.gz

下载完成后,解压进入目录,安装一些必要的依赖库。

sudo apt-get install libxml2-dev
./configure --enable-debug
make -j 4
sudo make install

因为我的扩展使用了phpx这个库,所有phpx的编译也需要开启debug模式,编辑cmakelist.txt加入 SET(CMAKE_BUILD_TYPE "Debug")。然后

cmake .
make -j 4
sudo make install

注意:在编译php扩展的时候也要开启调试模式。

使用gdb调试

因为php+phpx这个组合编译出来的扩展没有办法像普通的gdb调试一样,通过行号来打断点调试。因此需要根据函数名来打断点。

比如我的代码中有一个关键的方法名叫做input, 因此可以

nm /usr/local/lib/php/extensions/debug-non-zts-20151012/php_dv.so | grep input

得到以下的查找结果

000000000003a99e T _Z8Dv_inputRN3php6ObjectERNS_4ArgsERNS_7VariantE
00000000000491f8 T _ZN11DataAdaptor5inputESt6vectorIP6PersonSaIS2_EE

_ZN11DataAdaptor5inputESt6vectorIP6PersonSaIS2_EE这个就是可以打断点的函数名啦。

gdb php
break _ZN11DataAdaptor5inputESt6vectorIP6PersonSaIS2_EE
run paintAll.php

就可以看到断点打在了

Breakpoint 1, DataAdaptor::input (this=0x1259de0, 
    allPerson=std::vector of length 2622, capacity 2622 = {...}) at ../DataAdaptor.cc:195
195    void DataAdaptor::input(vector allPerson) {

可以使用layout分割窗口,大致结果如下:

gdb调试窗口

coredump信息的查看

php扩展出错后,也可以生成coredump文件。需要修改limits.conf中的core文件大小,比如设置成unlimited。生成core文件后,执行gdb php core, 然后使用bt即可打印出方法的调用栈,从而分析问题出现的原因。

如何在go中优雅的热升级服务

一、概述

在日常业务中,服务会经常升级,但是因为某些原因不希望断开和客户端的连接。因此就需要服务的热升级技术。

在研究这个问题之前,可以先看一下nginx是如何做到不间断服务热重启的。

  • 将新的nginx可执行文件替换掉旧的可执行文件。
  • master进程发送USR2信号,master进程在接收到信号后会将pid文件命名为.oldbin后缀。之后启动新的可执行文件,并启动新的worker进程。这个时候会有两个master进程

  • 向第一个master进程发送WINCH信号。第一个master进程会通知旧的worker进程优雅(处理完当前请求)地退出。

  • 如果此时新的可执行文件有问题。可以做以下措施:
  • 向旧的master进程发送HUP信号,旧的master进程会启动新的worker进程,并且不会重新读取配置文件。然后会向新的master进程发送QUIT信号来要求其退出。
  • 发送TERM信号到新的master进程,新的master进程和其派生的worker进程都会立刻退出。旧的master进程会自动启动新的worker进程。
  • 如果升级成功,QUIT信号会发送给旧的master进程,并退出。

完整的文档可以看: http://nginx.org/en/docs/control.html#upgrade

在go中处理这个问题也是这个思路。

二、具体实现

2.1 定义服务

type Server struct {
l         net.Listener  // 监听端口
conns     map[int]net.Conn  // 当前服务的所有连接
rw        sync.RWMutex      // 读写锁,用来保证conns在并发情况下的正常工作
idLock    sync.Mutex        // 锁,用来保证idCursor在并发情况下的递增没有问题
idCursor  int               // 用来标记当前连接的id
isChild   bool // 是否是子进程
status    int  // 当前服务的状态
relaxTime int  // 在退出时允许协程处理请求的时间
}

2.2 处理信号

func (s *Server) handleSignal() {
sc := make(chan os.Signal)

signal.Notify(sc, syscall.SIGHUP, syscall.SIGTERM)

for {
sig := <-sc

switch sig {
case syscall.SIGHUP:
log.Println("signal sighup")
// reload
go func() {
s.fork()
}()
case syscall.SIGTERM:
log.Println("signal sigterm")
// stop
s.shutdown()
}
}
}

这里只处理了两个信号,HUP表示要热升级服务,此时会fork一个新的服务。TERM表示要终止服务。

2.3 如何fork新的服务

func (s *Server) fork() (err error) {

log.Println("start forking")
serverLock.Lock()
defer serverLock.Unlock()

if isForked {
return errors.New("Another process already forked. Ignoring this one")
}

isForked = true

files := make([]*os.File, 1+len(s.conns))
files[0], err = s.l.(*net.TCPListener).File() // 将监听带入到子进程中
if err != nil {
log.Println(err)
return
}

i := 1
for _, conn := range s.conns {
files[i], err = conn.(*net.TCPConn).File()

if err != nil {
log.Println(err)
return
}

i++
}

env := append(os.Environ(), CHILD_PROCESS+"=1")
env = append(env, fmt.Sprintf("%s=%s", SERVER_CONN, strconv.Itoa(len(s.conns))))

path := os.Args[0] // 当前可执行程序的路径
var args []string
if len(os.Args) > 1 {
args = os.Args[1:]
}

cmd := exec.Command(path, args...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.ExtraFiles = files
cmd.Env = env

err = cmd.Start()
if err != nil {
log.Println(err)
return
}

return
}

这里会将监听的文件描述符以及所有连接的文件描述符都带到新的服务中。这里只需要在新的服务中重新使用这些文件描述符即可保证不断开连接。

2.4 服务的启动流程

func (s *Server) Start(addr string) {
var err error

log.Printf("pid: %v \n", os.Getpid())

s.setState(StateInit)

if s.isChild {
log.Println("进入子进程")
// 通知父进程停止
ppid := os.Getppid()

err := syscall.Kill(ppid, syscall.SIGTERM)

if err != nil {
log.Fatal(err)
}

// 子进程, 重新监听之前的连接
connN, err := strconv.Atoi(os.Getenv(SERVER_CONN))
if err != nil {
log.Fatal(err)
}

for i := 0; i < connN; i++ {
f := os.NewFile(uintptr(4+i), "")
c, err := net.FileConn(f)
if err != nil {
log.Print(err)
} else {
id := s.add(c)
go s.handleConn(c, id)
}
}
}

s.l, err = s.getListener(addr)
if err != nil {
log.Fatal(err)
}
defer s.l.Close()

log.Println("listen on ", addr)

go s.handleSignal()

s.setState(StateRunning)

for {
log.Println("start accept")
conn, err := s.l.Accept()
if err != nil {
log.Fatal(err)
return
}

log.Println("accept new conn")

id := s.add(conn)
go s.handleConn(conn, id)
}

}

func (s *Server) getListener(addr string) (l net.Listener, err error) {
if s.isChild {
f := os.NewFile(3, "")
l, err = net.FileListener(f)
return
}

l, err = net.Listen("tcp", addr)

return
}

启动时,会判断是否是fork出的新的进程。如果是,则继承从父进程传递过来的文件描述符,并重新监听或作为连接处理。

完整的代码参考github: https://github.com/joyme123/graceful_restart_server_in_golang_demo