分布式文件上传方案

-## 一、背景
考虑可扩展性,后台的服务肯定是要能够支持任意的扩展的,这样才能在业务量增长时通过增加机器的方式来应对。这对后台服务提出了一个要求,必须处理好分布式环境和单机环境的不同带来的问题。比如:在文件的分片上传这一场景下,应该负载均衡的问题,一个文件的多个分片请求会分布到不同的服务器上,这导致在将多个分片合并成完整文件时出现问题,而单机情况下则完全不会有这样的问题。

二、难点

这个问题的解决方案有很多种,但是需要根据实际情况尽量选择简洁、易部署和维护的方案进行,并且不能丢掉分布式系统的优点。比如网上的有的方案是使用单独的文件上传服务器,但是这就变成了单机服务了。也有使用NFS挂载的方案,即所有的服务器挂载一个相同的NFS目录,所有上传相关的文件都存放在挂载的目录下,这不仅给运维带来了麻烦,为了保证NFS的高可用,也带来了额外的运维成本。

三、解决方案

3.1 借助负载均衡

借助负载均衡的方案很简单,这是和应用无关的一种方法。即通过负载均衡这一层,将同一个文件的不同分片的请求全部导向到同一个服务器上。比如负载均衡这一块使用的是nginx, 通过url hash的方式来完成,可以使用如下的配置:

upstream backend {
    server 0.0.0.0:8080;
    server 0.0.0.0:8081;
    server 0.0.0.0:8082;
    hash request_uri;
}
server {
    location / {
        proxy_pass http://backend;
        proxy_set_header X-Real-IPremote_addr;
        proxy_set_header Host $host;
    }
}

然后写一个简单的服务来验证一下:

func post(resp http.ResponseWriter, req *http.Request) {
    v := req.PostFormValue("key")
    identity := req.PostFormValue("identity")
    log.Printf("identity: %v, value: %v\n", identity, v)

    resp.WriteHeader(http.StatusOK)
    _, err := resp.Write([]byte("ok"))
    if err != nil {
        log.Println("error:", err)
    }
}

func main() {
    var addr string
    flag.StringVar(&addr, "addr", "0.0.0.0:8080", "http listen addr:port")
    flag.Parse()

    http.HandleFunc("/Post", post)
    log.Println("listen ", addr)
    err := http.ListenAndServe(addr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

我们启动三个服务:

./godemo -addr 0.0.0.0:8080
./godemo -addr 0.0.0.0:8081
./godemo -addr 0.0.0.0:8082

使用curl来post数据过来,请求的地址类似于: http://0.0.0.0/Post?123456?后面可以当成是文件的唯一标识码,比如文件和用户id的组合的md5信息等,这样对于同一个用户上传的同一个文件的不同分片请求,通过url哈希都会得到同样的结果,这样就会被转发到同一个后端服务器。

curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST

结果如下:

端口8080的服务收到了三条请求:
2019/09/26 13:58:52 identity: 123456, value: value
2019/09/26 13:58:56 identity: 123456, value: value
2019/09/26 13:59:03 identity: 123456, value: value

端口8081的服务收到了三条请求:
2019/09/26 13:59:40 identity: 789abc, value: vvvvv
2019/09/26 13:59:43 identity: 789abc, value: vvvvv
2019/09/26 13:59:44 identity: 789abc, value: vvvvv

如果在k8s集群中部署,使用nginx-ingress的话可以使用以下的部署方案:

apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
    name: pipeline-ingress
    namespace: default
    annotations:
      nginx.ingress.kubernetes.io/proxy-body-size: "50m"
      nginx.ingress.kubernetes.io/upstream-hash-by: "$request_uri"
spec:
    rules:
        - host: pipeline.dev.com
          http:
              paths:
                  - path: / 
                    backend:
                        serviceName: pipeline-service
                        servicePort: 8888

当然这种方式需要注意你的每个请求的URI都需要加上额外的参数(比如用户的userId的md5),这样才能均匀的分布到不同的服务器上。

3.2 后台程序自动proxy请求

这个方案的思路是集群中的每个服务实例都有单独的标识,文件分片在第一次上传时会返回给它一个该请求所属服务器的identity,之后所有的请求会带上这个identity,之后收到请求的服务器会检查这个identity是不是属于自己,如果不属于自己,则把这个请求转发给所属的服务器。

这个方案要求每台服务器都知道其他所有服务器的identity和地址。这里我们可以使用etcd这样的分布式数据库来存储。每台服务器在启动时都向etcd里注册自己的identity和address,之后服务器转发的时候都向etcd里面查找对应的address即可。

下面是一个示例的代码:

package main

import (
    "context"
    "encoding/json"
    "flag"
    "io/ioutil"
    "log"
    "net/http"
    "go.etcd.io/etcd/clientv3"
    "net/url"
    "time"
)

type Server struct {
    Etcd string
    Addr string
    Identify string
}

type ResponseObj struct {
    Identity string `json:"identity,omitempty"`
    Value string `json:"value"`
}

func getEtcdKV(etcd string) (clientv3.KV, error) {
    cfg := clientv3.Config{
        Endpoints:               []string{etcd},
        // set timeout per request to fail fast when the target endpoint is unavailable
        DialTimeout: time.Second,
    }

    cli, err := clientv3.New(cfg)

    if err != nil {
        return nil, err
    }

    return clientv3.NewKV(cli), nil
}

func httpProxy(anotherServer string, body map[string]string) (*http.Response, error) {
    formData := url.Values{}

    for k, v := range body {
        formData.Set(k ,v)
    }

    return http.PostForm(anotherServer, formData)
}

func (s *Server) Register() error {
    cli, err := getEtcdKV(s.Etcd)
    if err != nil {
        return err
    }

    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Second)
    _, err = cli.Put(ctx, s.Identify, "http://" + s.Addr)
    cancel()
    if err != nil {
        return err
    }

    return nil
}

func (s *Server) Post(resp http.ResponseWriter, req *http.Request) {
    v := req.PostFormValue("key")
    identity := req.PostFormValue("identity")
    log.Printf("identity: %v, value: %v\n", identity, v)

    var data ResponseObj

    if identity != "" && identity != s.Identify {
        // proxy
        cli, err := getEtcdKV(s.Etcd)
        if err != nil {
            log.Println("get kv client error", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        etcdResp, err := cli.Get(context.Background(), identity)
        if err != nil {
            log.Println("get value error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        if len(etcdResp.Kvs) == 0 {
            log.Println("没有值")
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        proxyAddr := string(etcdResp.Kvs[0].Value)

        log.Println("proxy to: ", proxyAddr)

        text := map[string]string {
            "key": v,
            "identity": identity,
        }

        proxyResp, err := httpProxy(proxyAddr+"/Post", text)
        if err != nil || proxyResp.Body == nil {
            log.Println("http proxy error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        bodyData, err := ioutil.ReadAll(proxyResp.Body)
        if err != nil {
            log.Println("read proxy body error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        err = json.Unmarshal(bodyData, &data)
        if err != nil {
            log.Println("json unmarshal error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }
    } else {
        data.Identity = s.Identify
        data.Value = v
    }

    text, err := json.Marshal(data)
    if err != nil {
        log.Println("marshal json error: ", err)
        resp.WriteHeader(http.StatusInternalServerError)
        return
    }

    resp.WriteHeader(http.StatusOK)
    _, err = resp.Write(text)
    if err != nil {
        log.Println("error:", err)
    }
}

func main() {
    var addr string
    var identity string
    var etcd string
    flag.StringVar(&addr, "addr", "0.0.0.0:8080", "http listen addr:port")
    flag.StringVar(&identity, "identity", "", "identify the server")
    flag.StringVar(&etcd, "etcd", "http://127.0.0.1:2379", "etcd server url")
    flag.Parse()

    if identity == "" {
        log.Fatal("identity不可为空")
    }

    var server = &Server{
        Addr:     addr,
        Identify: identity,
        Etcd: etcd,
    }

    err := server.Register()
    if err != nil {
        log.Fatal("register server error, check your etcd server: ", err)
    }

    http.HandleFunc("/Post", server.Post)
    log.Println("listen ", addr)
    err = http.ListenAndServe(addr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

3.3 基于S3存储的方案

兼容S3的存储系统有一个对外的接口叫做: ComposeObject。这个接口可以将多个文件合并成一个文件存储到指定位置。如果我们的底层存储是基于兼容S3的系统,那么就可以利用这个接口轻松的实现。

方案的步骤可以描述成以下:

  1. 前端实现将文件分片,然后上传
  2. 上传的请求会因为前端负载均衡的原因分布到不同的服务器,每个上传请求都要带上这个文件、用户id、随机字符串组合的md5值,服务器收到请求后,只管将分片上传到以md5值为名的目录下,一旦上传成功,使用etcd或redis这样作为分片计数器+1.
  3. 分片总数等于总分片数时调用ComposeObject组合所有分片存储到指定位置即可。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据