您现在的位置是:亿华云 > 系统运维
Golang应付百万级请求/分钟
亿华云2025-10-03 22:08:19【系统运维】5人已围观
简介我在不同公司从事反爬虫、反病毒、反恶意程序已经有15年了,我知道,由于每天需要处理和应对的大量数据,这些系统最终会因此变得十分复杂。目前我是smsjunk.com的CEO以及KnowBe4的首席架构师
我在不同公司从事反爬虫、应付反病毒、百万反恶意程序已经有15年了,求分我知道,应付由于每天需要处理和应对的百万大量数据,这些系统最终会因此变得十分复杂。求分
目前我是应付smsjunk.com的CEO以及KnowBe4的首席架构师,两家公司都是百万活跃与网络安全行业。
网站模板开发人员已经很多年了,然后我开始慢慢意识到,使用合适的工具让系统变得更加简单明了才是一件正确的事情。
编程界对于编程语言以及框架的争论从未停歇,而我并不想参与到其中去。我相信效率高低,生产力大小以及代码的可维护性很大一部分取决于你所设计的架构是否足够简单。
要解决的问题
当我们开发一个匿名遥测以及数据分析系统的时候,其中一个需求是能够处理和应付百万数量级的POST请求,网络请求处理器会接收一个POST过来JSON,这个JSON里面会包含许多需要写入到Amazon S3的数据集合,以便我们的map-reduce系统可以在后续来处理这些数据。
一般情况下我们会考虑构建一个worker分层的结构,并且利用一些中间件,例如:
Sidekiq Resque DelayedJob Elasticbeanstalk Worker Tier RabbitMQ 等等..然后设立两个不同的集群,一个是给web客户端,另一个是云服务器给worker,然后我们可以将worker扩容到我们处理业务时所需要的数量。
但在最开始的时候,我们的团队就意识到可以用Go来实现所有这些,因为在讨论期间我们认为这将会是一个非常高访问量的系统。我利用Go来开发也已经有两年了,用它来开发过一些系统,但是负载规模远没有此次的需求这么大。
我们先定义一些struct来规定我们POST接收的请求体,以及定义一个上传到S3 bucket的方法UploadToS3
type PayloadCollection struct { WindowsVersion string `json:" version"` <="" p="" style="width: 100%; margin-bottom: 20px;"> Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked private var acl = s3.Private var contentType = "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{ }) }幼稚地使用Go runtines
最开始的时候我们非常天真地实现一个POST的钩子方法如下,只是简单地将每个请求体的上传动作放到Go rutinues中让他们并行执行:
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{ } err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DONT DO THIS } w.WriteHeader(http.StatusOK) }在中等规模的负载情况下,这种方法对大部分人都是没有问题的,但在应对更大规模的请求量时候,我们很快就招架不住了。当我们把这个版本的代码部署到生产环境以后,云服务器提供商我们期待能有大量的请求进来但实际还不能达到百万级别的数量级。我们完全低估了这个系统要处理的流量数。
但不管怎么说上面的方法都是欠妥的。因为它没有任何方法让我们去控制Go runtinues启动的数量。所以当我们的系统在面对每分钟百万级POST请求的时候很快就垮掉了。
再战
我们需要找到另外的方法。在一开始我们就在讨论如何让我们的请求处理程序的生命周期尽可能地缩短以及上传到S3的操作能在后台或者异步运行。当然,在Ruby on Rails里面你必须这么做,否则你将会阻塞到所有其他的网络请求处理程序。无论您使用的是美洲狮,独角兽还是过路人(请不要参与JRuby讨论)。然后我们想到使用消息队列这种比较常见的方法来处理来达到我们的目的,例如Resque, Sidekiq, SQS等等,还有数不清的工具因为实在有太多方法来实现这个功能。
所以在第二次迭代的时候,我们需要创建一个缓冲队列,我们会将任务放入队列里面然后再一个个地上传到S3上,但由于我们希望达到能够控制这个队列的最大容量的目的,并且我们有足够的RAM来允许我们将请求体储存到内存当中,所以我们认为直接使用了Go提供的channel,然后将我们的请求直接入队到channel中处理就可以了。
var Queue chan Payload func init() { Queue = make(chan Payload, MAX_QUEUE) } func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... }我们会从channel中获取任务并且执行他们的上传操作
func StartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- STILL NOT GOOD } } }但说句老实话,我并不知道这是在干嘛。肯定是因为那时已经太晚还有我们已经喝了太多的红牛。
这个改动并没有让我们的困境得到任何改善,我们将并发任务放到了队列中执行仅仅是看上去好像解决了问题。但是我们的异步程序一次只会上传一个请求体到S3上面,但是我们的请求数此时远远大于我们上传到S3的数量,可想而知我们的缓冲队列很快就到达了他的极限爆满了,然后它阻挡了其他网络请求的入队操作。
相当于我们仅仅回避了问题,并且让我们的系统的崩溃时间进入了倒数。我们这个缺陷的版本发布以后,整个系统的延迟率在持续性地每分钟在上涨。
MaxQueue = os.Getenv("MAX_QUEUE") ) // Job represents the job to be run type Job struct { Payload Payload } // A buffered channel that we can send work requests on. var JobQueue chan Job // Worker represents the worker that executes the job type Worker struct { WorkerPool chan chan Job JobChannel chan Job quit chan bool } func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool)} } // Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // we have received a work request. if err := job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s", err.Error()) } case <-w.quit: // we have received a signal to stop return } } }() } // Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.quit <- true }() }
接下来修改我们网络请求的钩子函数,负责创建一个Job的结构体的实例然后将其放入JobQueue channel中等待worker来获取执行。
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{ } err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { // lets create a job with the payload work := Job{ Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader(http.StatusOK) }在我们网络服务初始化的时候创建一个Dispather并且调用Run()创建一个装有一定数量worker的线程池,用来接收和处理来自JobQueue的Job
dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()下面是我们Dispather的实现
type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{ WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } }当我们将这个版本发布到生产环境以后我们的延迟率马上有明显的下降,我们处理请求的能力有一个质的飞跃。
很赞哦!(69437)