-## 一、背景
考虑可扩展性,后台的服务肯定是要能够支持任意的扩展的,这样才能在业务量增长时通过增加机器的方式来应对。这对后台服务提出了一个要求,必须处理好分布式环境和单机环境的不同带来的问题。比如:在文件的分片上传这一场景下,应该负载均衡的问题,一个文件的多个分片请求会分布到不同的服务器上,这导致在将多个分片合并成完整文件时出现问题,而单机情况下则完全不会有这样的问题。
二、难点
这个问题的解决方案有很多种,但是需要根据实际情况尽量选择简洁、易部署和维护的方案进行,并且不能丢掉分布式系统的优点。比如网上的有的方案是使用单独的文件上传服务器,但是这就变成了单机服务了。也有使用NFS挂载的方案,即所有的服务器挂载一个相同的NFS目录,所有上传相关的文件都存放在挂载的目录下,这不仅给运维带来了麻烦,为了保证NFS的高可用,也带来了额外的运维成本。
三、解决方案
3.1 借助负载均衡
借助负载均衡的方案很简单,这是和应用无关的一种方法。即通过负载均衡这一层,将同一个文件的不同分片的请求全部导向到同一个服务器上。比如负载均衡这一块使用的是nginx, 通过url hash
的方式来完成,可以使用如下的配置:
upstream backend {
server 0.0.0.0:8080;
server 0.0.0.0:8081;
server 0.0.0.0:8082;
hash request_uri;
}
server {
location / {
proxy_pass http://backend;
proxy_set_header X-Real-IPremote_addr;
proxy_set_header Host $host;
}
}
然后写一个简单的服务来验证一下:
func post(resp http.ResponseWriter, req *http.Request) {
v := req.PostFormValue("key")
identity := req.PostFormValue("identity")
log.Printf("identity: %v, value: %v\n", identity, v)
resp.WriteHeader(http.StatusOK)
_, err := resp.Write([]byte("ok"))
if err != nil {
log.Println("error:", err)
}
}
func main() {
var addr string
flag.StringVar(&addr, "addr", "0.0.0.0:8080", "http listen addr:port")
flag.Parse()
http.HandleFunc("/Post", post)
log.Println("listen ", addr)
err := http.ListenAndServe(addr, nil)
if err != nil {
log.Fatal(err)
}
}
我们启动三个服务:
./godemo -addr 0.0.0.0:8080
./godemo -addr 0.0.0.0:8081
./godemo -addr 0.0.0.0:8082
使用curl来post数据过来,请求的地址类似于: http://0.0.0.0/Post?123456
。?
后面可以当成是文件的唯一标识码,比如文件和用户id的组合的md5信息等,这样对于同一个用户上传的同一个文件的不同分片请求,通过url哈希都会得到同样的结果,这样就会被转发到同一个后端服务器。
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
结果如下:
端口8080的服务收到了三条请求:
2019/09/26 13:58:52 identity: 123456, value: value
2019/09/26 13:58:56 identity: 123456, value: value
2019/09/26 13:59:03 identity: 123456, value: value
端口8081的服务收到了三条请求:
2019/09/26 13:59:40 identity: 789abc, value: vvvvv
2019/09/26 13:59:43 identity: 789abc, value: vvvvv
2019/09/26 13:59:44 identity: 789abc, value: vvvvv
如果在k8s集群中部署,使用nginx-ingress
的话可以使用以下的部署方案:
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
name: pipeline-ingress
namespace: default
annotations:
nginx.ingress.kubernetes.io/proxy-body-size: "50m"
nginx.ingress.kubernetes.io/upstream-hash-by: "$request_uri"
spec:
rules:
- host: pipeline.dev.com
http:
paths:
- path: /
backend:
serviceName: pipeline-service
servicePort: 8888
当然这种方式需要注意你的每个请求的URI
都需要加上额外的参数(比如用户的userId的md5),这样才能均匀的分布到不同的服务器上。
3.2 后台程序自动proxy请求
这个方案的思路是集群中的每个服务实例都有单独的标识,文件分片在第一次上传时会返回给它一个该请求所属服务器的identity,之后所有的请求会带上这个identity,之后收到请求的服务器会检查这个identity是不是属于自己,如果不属于自己,则把这个请求转发给所属的服务器。
这个方案要求每台服务器都知道其他所有服务器的identity和地址。这里我们可以使用etcd
这样的分布式数据库来存储。每台服务器在启动时都向etcd里注册自己的identity和address,之后服务器转发的时候都向etcd里面查找对应的address即可。
下面是一个示例的代码:
package main
import (
"context"
"encoding/json"
"flag"
"io/ioutil"
"log"
"net/http"
"go.etcd.io/etcd/clientv3"
"net/url"
"time"
)
type Server struct {
Etcd string
Addr string
Identify string
}
type ResponseObj struct {
Identity string `json:"identity,omitempty"`
Value string `json:"value"`
}
func getEtcdKV(etcd string) (clientv3.KV, error) {
cfg := clientv3.Config{
Endpoints: []string{etcd},
// set timeout per request to fail fast when the target endpoint is unavailable
DialTimeout: time.Second,
}
cli, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
return clientv3.NewKV(cli), nil
}
func httpProxy(anotherServer string, body map[string]string) (*http.Response, error) {
formData := url.Values{}
for k, v := range body {
formData.Set(k ,v)
}
return http.PostForm(anotherServer, formData)
}
func (s *Server) Register() error {
cli, err := getEtcdKV(s.Etcd)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Second)
_, err = cli.Put(ctx, s.Identify, "http://" + s.Addr)
cancel()
if err != nil {
return err
}
return nil
}
func (s *Server) Post(resp http.ResponseWriter, req *http.Request) {
v := req.PostFormValue("key")
identity := req.PostFormValue("identity")
log.Printf("identity: %v, value: %v\n", identity, v)
var data ResponseObj
if identity != "" && identity != s.Identify {
// proxy
cli, err := getEtcdKV(s.Etcd)
if err != nil {
log.Println("get kv client error", err)
resp.WriteHeader(http.StatusInternalServerError)
resp.Write([]byte("error"))
return
}
etcdResp, err := cli.Get(context.Background(), identity)
if err != nil {
log.Println("get value error: ", err)
resp.WriteHeader(http.StatusInternalServerError)
resp.Write([]byte("error"))
return
}
if len(etcdResp.Kvs) == 0 {
log.Println("没有值")
resp.WriteHeader(http.StatusInternalServerError)
resp.Write([]byte("error"))
return
}
proxyAddr := string(etcdResp.Kvs[0].Value)
log.Println("proxy to: ", proxyAddr)
text := map[string]string {
"key": v,
"identity": identity,
}
proxyResp, err := httpProxy(proxyAddr+"/Post", text)
if err != nil || proxyResp.Body == nil {
log.Println("http proxy error: ", err)
resp.WriteHeader(http.StatusInternalServerError)
resp.Write([]byte("error"))
return
}
bodyData, err := ioutil.ReadAll(proxyResp.Body)
if err != nil {
log.Println("read proxy body error: ", err)
resp.WriteHeader(http.StatusInternalServerError)
resp.Write([]byte("error"))
return
}
err = json.Unmarshal(bodyData, &data)
if err != nil {
log.Println("json unmarshal error: ", err)
resp.WriteHeader(http.StatusInternalServerError)
resp.Write([]byte("error"))
return
}
} else {
data.Identity = s.Identify
data.Value = v
}
text, err := json.Marshal(data)
if err != nil {
log.Println("marshal json error: ", err)
resp.WriteHeader(http.StatusInternalServerError)
return
}
resp.WriteHeader(http.StatusOK)
_, err = resp.Write(text)
if err != nil {
log.Println("error:", err)
}
}
func main() {
var addr string
var identity string
var etcd string
flag.StringVar(&addr, "addr", "0.0.0.0:8080", "http listen addr:port")
flag.StringVar(&identity, "identity", "", "identify the server")
flag.StringVar(&etcd, "etcd", "http://127.0.0.1:2379", "etcd server url")
flag.Parse()
if identity == "" {
log.Fatal("identity不可为空")
}
var server = &Server{
Addr: addr,
Identify: identity,
Etcd: etcd,
}
err := server.Register()
if err != nil {
log.Fatal("register server error, check your etcd server: ", err)
}
http.HandleFunc("/Post", server.Post)
log.Println("listen ", addr)
err = http.ListenAndServe(addr, nil)
if err != nil {
log.Fatal(err)
}
}
3.3 基于S3存储的方案
兼容S3的存储系统有一个对外的接口叫做: ComposeObject
。这个接口可以将多个文件合并成一个文件存储到指定位置。如果我们的底层存储是基于兼容S3的系统,那么就可以利用这个接口轻松的实现。
方案的步骤可以描述成以下:
- 前端实现将文件分片,然后上传
- 上传的请求会因为前端负载均衡的原因分布到不同的服务器,每个上传请求都要带上这个文件、用户id、随机字符串组合的md5值,服务器收到请求后,只管将分片上传到以md5值为名的目录下,一旦上传成功,使用etcd或redis这样作为分片计数器+1.
- 分片总数等于总分片数时调用
ComposeObject
组合所有分片存储到指定位置即可。