ujunのブログ

SQSのキューからひたすらreceiveするくん

60万件くらい入ってたSQSのキューからひたすらreceiveした人

10多重

package main

import ( 
    "sync"
    "fmt"
    "github.com/aws/aws-sdk-go/service/sqs"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    kingpin "gopkg.in/alecthomas/kingpin.v2"
)

var (
    profile       = kingpin.Flag("profile", "AWS credential(default profile if not specified)").Default("").String()
    region        = kingpin.Flag("region", "AWS region(ap-northeast-1 if not specified)").Default("ap-northeast-1").String()
)

func main() {
    kingpin.Parse() 

    // session for AWS
    sess := session.Must(session.NewSessionWithOptions(session.Options{Profile:*profile, Config:aws.Config{Region:region}}))
    
    // sqs client
    sqs_svc := sqs.New(sess)
    
    wg := &sync.WaitGroup{}
    receiveMessageInput := sqs.ReceiveMessageInput{}
    receiveMessageInput.SetMaxNumberOfMessages(10).SetQueueUrl("url")
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func () {
            for {
                receiveMessageOutput, _ := sqs_svc.ReceiveMessage(&receiveMessageInput)
                if len(receiveMessageOutput.Messages) == 0 { break }
                for _, mess := range receiveMessageOutput.Messages {
                    fmt.Println(*mess.Body)
                    deleteMessageInput := sqs.DeleteMessageInput{}
                    deleteMessageInput.SetReceiptHandle(*mess.ReceiptHandle).SetQueueUrl("url")
                    _, err := sqs_svc.DeleteMessage(&deleteMessageInput)
                    if err != nil {
                        panic(err)
                    }
                }
            }
            wg.Done()
        }()
    } 
    wg.Wait()
}