BPFを用いてPostgreSQL共有バッファ読込を動的トレーシング

この記事は、 Sansan Advent Calendar 2018 - Adventar 22日目のエントリです。

目次

AWRのSQL ordered by Readsのようにクエリごとにreadしたブロック数をPostgreSQLで知りたい

Oracle DBを運用していると、AWRやStatspackのお世話になることが多いはずである。 AWRでは、レポートの中に SQL ordered by Reads というセクションがあり、ストレージからReadしたブロック数をクエリごとに集計して表示してくれる。 普段からブロック読込の多いクエリはここを見れば把握でき、Oracleを触っていた時はここをよく見ていたし、実際に性能改善の解決の糸口になったこともあったりした。 ブロックの読込が多い傾向にあればそれはそのままロングクエリになる可能性が高いし、直感的でわかりやすい指標だと思う。

データベースのパフォーマンスというのは、永続ストレージに如何にアクセスしないで済むかということに大きく絡むため、クエリごとにメモリに読み込んだブロックを把握することはパフォーマンス改善の点で非常に有用であるのだけど、PostgreSQLではどのようにそれを観測すればいいだろうか。

bcc/BPFによるシステムの動的トレーシング

データベースの話題から一旦それる。

BPF(Berkeley Packet Filter)は、Linuxカーネル内で小さなユーザスクリプトを動作させることができる仮想マシンである。名前の通りもともとパケットフィルタツールとして 開発されたものであったが、近年においては拡張され、LinuxOSの動的トレーシングに使われるように進化してきた。例えば、特定ファイルに対するアクセスを捕捉して動作する小さなプログラムを簡単に書くことができる。

このように拡張されたBPFは、特にeBPF(enhanced BPF) と呼ばれるが、単にBPFといった時にはeBPFをさすことが多いようである。

BPFを扱う上では、bcc(BPF Compiler Collection) を使うと良い。

GitHub - iovisor/bcc: BCC - Tools for BPF-based Linux IO analysis, networking, monitoring, and more

これは、カーネルレベルの処理にCを、それをフックして動作するフロントエンドにPythonLuaを用いてで作成することで、BPFプログラムをより簡単に書くことができる。 パフォーマンス分析やネットワークトラフィック制御など、多くのタスクに適用可能である。

bcc/BPFは、RHEL8 BetaにもTechnology previewsとして導入され、今後注目すべき技術であることは間違いない。

Chapter 6. Technology previews - Red Hat Customer Portal

さて、PostgreSQLには、BPFをはじめとした分析ツールで動的トレーシングを行う時に使えるカーネルトレースポイント(USDT probe)があらかじめいくつも定義されている。標準で提供されるモニタリングツール以上にいろいろ運用上役立つものが多そうである。

クエリごとにreadしたブロック数 といったメトリクスは、そのプローブのうちの1つを使えば採取することができる。

PostgreSQLが提供するプローブ

PostgreSQLは動的トレーシングで利用できるプローブを数多く提供しており、標準で組み込まれるプローブはドキュメントにある通りである。

PostgreSQL: Documentation: 11: 28.5. Dynamic Tracing

独自のプローブを定義することも可能であるが、デフォルトでも多くのActivityを捕捉できそう。

前提として、 PostgreSQL自体が --enable-dtrace コンパイルオプション付きでビルドされている必要がある。(なお、今回のすべてのオペレーションは、DigitalOceanのUbuntu 18.10 x64 Droplet / PostgreSQL11.1で行った)

# cd /path/to/postgresql-source
# ./configure --enable-dtrace
# make && make install

ちなみに、MySQLは多くのディストリビューションで、デフォルトでUSDT probe有効化されてビルドされているらしい。

smgr-md-read-doneのコールバックで共有バッファ読込を観測するpython+c のコード

上記プローブのうち、今回は smgr-md-read-done に着目する。ドキュメント的には、 ブロックの読み込み終了を捕捉するプローブ と説明されている。

以下が、特定のバックエンドプロセスにアタッチし、ブロック読込を捕捉して対象ブロック情報を表示するスクリプト

使い方は、 blk_read_pgsql -p <postgres_backend_PID> という感じである。

bpf script for block read into shared_buffer · GitHub

CのパートとPythonのパートの中から中心的な部分をそれぞれ挙げると、

  • Cの部分

いきなりインラインでCを書いているが、やりたいことは、 smgr-md-read-done の結果として得られる値を構造体に詰めて BPF_PERF_OUTPUT() でイベントとしてユーザスペースに渡している。このイベントの出力を受け付けるのがpythonパートとなる。

#include <uapi/linux/ptrace.h>

struct data_t {
    u64 timestamp;
    int pid;
    int oid;
    int blkno;
    int requested_size;
    int act_size;
};

BPF_PERF_OUTPUT(events);

int query_blk_read(struct pt_regs *ctx) {
    struct data_t data = {};
    data.timestamp = bpf_ktime_get_ns();
    bpf_usdt_readarg(5, ctx, &data.oid);
    bpf_usdt_readarg(2, ctx, &data.blkno);
    bpf_usdt_readarg(8, ctx, &data.requested_size);
    bpf_usdt_readarg(7, ctx, &data.act_size);
    data.pid = bpf_get_current_pid_tgid();
    events.perf_submit(ctx, &data, sizeof(data));
    return 0;
}

smgr-md-read-done に対して先ほどCパートで定義したquery_blk_read() をコールバックとして登録している。Data クラスはイベントとして得られた値を格納する各種フィールドを持つ。イベント発生ごとに print_event() が実行される。

usdts = map(lambda pid: USDT(pid=pid), args.pids)
for usdt in usdts:
    usdt.enable_probe("smgr__md__read__done", "query_blk_read")

bpf = BPF(text=program, usdt_contexts=usdts)

class Data(ct.Structure):
    _fields_ = [
        ("timestamp", ct.c_ulonglong),
        ("pid", ct.c_int),
        ("oid", ct.c_int),
        ("blkno", ct.c_int),
        ("requested_size", ct.c_int),
        ("act_size", ct.c_int),
    ]

start = BPF.monotonic_time()

def print_event(cpu, data, size):
    event = ct.cast(data, ct.POINTER(Data)).contents
    print("%-14.6f %-6d %-6d %-6d %-6d %-6d" % (
        float(event.timestamp - start) / 1000000000,
        event.pid,
        event.oid,
        event.blkno,
        event.requested_size,
        event.act_size))

print("%-14s %-6s %-6s %-6s %-6s %-6s" % ("TIME(s)", "PID", "OID", "BLKNO", "REQ", "ACT"))

bpf["events"].open_perf_buffer(print_event, page_cnt=64)
while True:
    bpf.perf_buffer_poll() 

使用例

バックエンドプロセスをてきとうに選んでアタッチすると、以下のような状態で待機ループに入る。

# pgrep -al postgres
...
10834 postgres: postgres database [local] idle
...

# ./blk_read_pgsql -p 10834
TIME(s)        PID    OID    BLKNO  REQ    ACT

pid: 10834 からなにかクエリを発行すると、上記の待機中のコンソールに、読み込んだブロックが表示される。 たとえば以下のような結果が得られたなら、OID: 24643 のオブジェクトを最初のブロックから順に共有バッファに読み込んだことになる。きっと初めて触るテーブルをフルスキャンすればこのような結果になるはずである。

TIME(s)        PID    OID    BLKNO  REQ    ACT
7.021272       10834  24643  0      8192   8192
7.022607       10834  24643  1      8192   8192
7.023180       10834  24643  2      8192   8192
7.023365       10834  24643  3      8192   8192
7.023579       10834  24643  4      8192   8192
7.023738       10834  24643  5      8192   8192
...

pg_buffercache というcontribに含まれるextensionを使うと共有バッファの内訳を確認することができ、上記のクエリ直後だと以下のように OID: 24643 のブロックがshare_buffersに実際にのっていることがわかる。

database=# select relfilenode, relblocknumber from pg_buffercache where relfilenode = 24643;
 relfilenode | relblocknumber
-------------+----------------
       24643 |              0
       24643 |              1
       24643 |              2
       24643 |              3
       24643 |              4
       24643 |              5
...

このあと別のバックエンドプロセスを blk_read_pgsql で観測しながら同一のクエリを流してもブロック読み込が確認できないのは、既に共有バッファに必要なブロックが存在するからであることがわかる。

ちなみに VACUUM FULL のようなOIDが変わる処理を実施するとshare_buffersからもそのブロック情報が完全に消し去られ、やはりもう一度 blk_read_pgsql で読み込みが行われることが観察できる。きっとpg_repackとかpg_reorgでも同じことが確認できると思う。

Wrap-up / bccを利用した可観測性ツールが熱い

bcc/BPFを用いて、PostgreSQLが提供するUSDTを捕捉して低レベルなメトリクスを取得した。bccではPythonによる記述が非常に強力なのでこれは使える。

RHEL8の話題というとbcc/BPFが熱いと個人的には思っている。今回のPostgreSQL動的トレーシングの例はBPFの無限の可能性のうちの1つであって、実際にBPFをベースにした活発なプロジェクトがいくつもあるようなので、要注目技術として追っていきたい。

参考資料

Microservices Get Started その1

このエントリに書くこと

このエントリは以下のようなことについて自分が最近まなんだ範囲で書く。

自分の環境は以下:

- macOS 10.13.4
- cpython 3.6.1
- nameko 2.9.1
- Docker for Mac Version 18.06.0-ce-mac70


introduction

Microservicesというワードはよく聞くし、いくつかブログや本も齧ったりしたけど、 全く自分で構築したこともないということで、いつか仕事で役立つ時がくるかもしれないしローカル環境で作ってみようと思う。

NamekoというPython製のフレームワークを使う。
可愛らしい名前だが、初心者にやさしいチュートリアル的なものが一切なく、結構使い始めるのに困るので、そういった意味でもこのエントリを書いている。 本エントリはマジでhello worldしかしてない。

GitHub - nameko/nameko: Python framework for building microservices


MicroservicesとSOA

Microservicesを始めようとする時、絶対気になるのが、 SOA(Service oriented architecture)との違いかと思われる。自分もいまだに気になる。

SOAは、S/Wエンティティ同士が細かく分割されていて、それぞれがメッセージのやりとり(SOAPとかHTTP)でコミュニケーションする。

MicroservicesはこのSOAのアドバンスドというか延長線上にいる。

ただし両者は、目的が大きく異なる。 誤解をおそれない感じで書いてみる。

SOAについて

巨大な業務システムを構築する際に、さまざまなステークホルダがいて、 それらが独自に実装したサブシステム同士が通信することを考える。
取引処理と決済処理と在庫処理が互いにデータを参照しあう時、取引システムと決済システムと在庫システムが全く別の組織が開発をしていても、 共通のインタフェースがあると、それぞれの実装はブラックボックスでもよくて、実装言語はなんでもいいしプラットフォームもそれぞれが得意なものを選択できる。 開発体制を分断できる。
ブラックボックスの先が超巨大で複雑なインフラでも、超スパゲティコードでもSOASOAである。
こういうこともあり、SOAは比較的S/Wの内部的なアーキテクチャに関心がない。

Microservicesについて

こちらはもっと開発品質の向上にフォーカスしている。 デリバリの効率をあげ、S/Wの開発そのものの改善を目指すもの。 したがって必然的に、S/Wの内部的なアーキテクチャに強い関心がある。
個々のサービスは、自律的かつ非依存的で、管理可能なほど小さな単位で完結しており、デプロイが容易でなければならない。 SOAPみたいなXMLのやりとりではなく,RESTやRPCを用いてコミュニケーションする。
サービス間で同じDBを参照するとかコードをシェアするとかは基本的になしである。 このように、非常に小さなサイズのサービスを包括的に管理する必要があるという意味で、 モノリシックなシステムと比較して運用がすごく難しい。
Microservicesの文脈ではコンテナが前提となっているが、運用の困難さを解決すべくOSSがいくつも出ている。


Nameko

さて、PythonでMicroservicesを書く時、Namekoというフレームワークでかなり簡単に始められる。 ちなみに、なめこが群生している様が、小さなサービスが群れているMicroservicesとなんか似ているのが由来ということらしい。 最初に触るにはよさそうである。 ただ、ドキュメントにチュートリアル的な初心者にやさしいものがないので、少し困る。

RPCについて

MicroservicesではRPCでサービス間のコミュニケーションをすることが必要となる。 これは、ネットワーク越しに存在するコードを、見かけ上はローカルのものを呼ぶかのように実行する。 RESTなどと比較して難しいのが、自分が書いているコードで、気をつけないと意図せずリモートとの通信を連発する可能性があること。 外部のリソースを使うのは高くつくものであるので、リモートコールする箇所は意識しておく必要がある。

Message Queuingについて

Namekoでは、RPCによるリモートコールを実行すると、Message Queueにリクエストを詰める。 呼び出し先のサービスでは、このキューからメッセージをconsumeしてタスクを実行する。 consumerは水平スケールすることでパフォーマンスを確保することが容易。

RabbitMQについて

メッセージのブローカとして、NamekoではRabbitMQが利用できる。 自分はDocker for Macでお手軽にローカルにRabbigMQを立てることにする。


Namekoを使って最初のMicroservices

初めてやるので、普段の環境とは分けておいた方が無難だろう。 てきとうに検証用のディレクトリを掘ってそこに移動する。

$ mkdir first_nameko; cd first_nameko

専用の仮想環境を作っておく。 (自分はcpython 3.6.1 を使った)

$ pyenv-virtualenv 3.6.1 first_nameko
$ pyenv local first_nameko

Namekoをインストールする

$ pip install nameko

まずは最初の HelloWold サービスを作ってみる。 公式ドキュメントから借用して、 helloworld.py を以下の内容で作成する。

from nameko.rpc import rpc

class GreetingService:
    name = "greeting_service"

    @rpc
    def hello(self, name):
        return "Hello, {}!".format(name)

サービス名は、 greeting_service である
@rpc デコレータによって、このメソッドがRPC経由で呼び出されるようにする。
次に、以下の config.yaml を同じ階層に置く。

AMQP_URI: 'pyamqp://guest:guest@localhost'

AMQP とは、NamekoがRPCをやりとりする上で利用するプロトコルらしく、今回はメッセージのブローカとしてRabbitMQを使うので、そのエンドポイントを記述。 RabbitMQにはデフォルトで guest:guest なユーザがいる。

次に、RabbitMQ自体はDockerでさくっと立てる。

$ docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq

そして、初めてのサービスを起動する。

$ nameko run helloworld --config config.yaml
starting services: greeting_service
Connected to amqp://guest:**@127.0.0.1:5672//

Namekoには cliが付属しているので、使ってみる。

$ nameko shell
Nameko Python 3.6.1 (default, Sep 26 2017, 15:11:41)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] shell on darwin
Broker: pyamqp://guest:guest@localhost
>>>

さきほど起動したサービスに定義したメソッドを呼び出してみる。

>>> n.rpc.greeting_service.hello("ujun")
'Hello, ujun!'


このあと

Namekoを使って簡単にMicroserviceを構築できそうな雰囲気があるので、どんどん使っていきたい。 nameko.web にHTTPハンドラを作るモジュールがあるので、RESTなエンドポイントを定義してWebアプリっぽいものも簡単に作れる。


つづく

TerraformでAWS CodeDeployのアプリケーションを作る

リリース時に手で展開しているバッチを、いい加減なんらかのデプロイツールでやりたい。

できる限りTerraformで環境構築したいというのもある。

参考: https://www.terraform.io/docs/providers/aws/r/codedeploy_app.html#

素の状態から簡単にアプリケーション作るなら以下のような感じ。

# アプリケーション名
resource "aws_codedeploy_app" "sample-app" {
  name = "sample"
}

# リリース時に参照するconfiguration
# 設定項目はリリースの際にどの程度のホストが正常起動している必要があるかのみ
resource "aws_codedeploy_deployment_config" "sample-config" {
  deployment_config_name = "sample-config"

  minimum_healthy_hosts {
    type  = "FLEET_PERCENT"
    value = 50
  }
}

# リリース対象のサーバを識別するための設定
# とりあえず `"application":"sample"` というタグがついたEC2インスタンスを対象としている
resource "aws_codedeploy_deployment_group" "sample-group" {
  app_name               = "${aws_codedeploy_app.sample-app.name}"
  deployment_group_name  = "sample-group"
  service_role_arn       = "${aws_iam_role.sample_codedeploy.arn}"
  deployment_config_name = "${aws_codedeploy_deployment_config.sample-config.id}"

  ec2_tag_set {
    ec2_tag_filter {
      type  = "KEY_AND_VALUE"
      key   = "application"
      value = "sample"
    }
  }
}

# 必要なIAMロールの設定
resource "aws_iam_role" "sample_codedeploy" {
  name = "sample_codedeploy"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "codedeploy.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "AWSCodeDeployRole" {
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSCodeDeployRole"
  role       = "${aws_iam_role.sample_codedeploy.name}"
}

PowerShell で基本的なコンピュータリソースのメトリクスをとる(プロセスごと)

Windows Server 2016の検証を行なっていてもろもろとるために調べた。

(実行前に Get-Counter -ListSet process | Select-Object -ExpandProperty Paths をやらないとエラーとなることがある 参考: 同一PSで、「Get-Counter '\Process(*)\% Processor Time'」 を複数回実行するとエラーが出る )

  • CPU使用率
(Get-Counter "\Process(*)\% Processor Time").CounterSamples | select InstanceName, @{label="CPU %";expression={$_.CookedValue / $Env:NUMBER_OF_PROCESSORS -as [int]}} | where {$_.InstanceName -ne "idle" -and $_.InstanceName -ne "_total"} | sort "CPU %" -Descending
  • Memory使用率
(Get-Counter "\Process(*)\working set - private").CounterSamples | select InstanceName, @{label="memory MBytes";expression={$_.CookedValue/1024/1024}}, @{label="memory %";expression={$_.CookedValue / (Get-WmiObject win32_computersystem).totalphysicalmemory }} | where {$_.InstanceName -ne "idle" -and $_.InstanceName -ne "_total"} | sort "memory %" -Descending
  • IO Write Bytes/sec
(Get-Counter "\Process(*)\IO Write Bytes/sec").CounterSamples | select InstanceName, @{label="IO Write MBytes/sec";expression={$_.CookedValue}} | where {$_.InstanceName -ne "idle" -and $_.InstanceName -ne "_total"} | sort "IO Write MBytes/sec" -Descending
  • IO Read Bytes/sec
(Get-Counter "\Process(*)\IO Read Bytes/sec").CounterSamples | select InstanceName, @{label="IO Read MBytes/sec";expression={$_.CookedValue}} | where {$_.InstanceName -ne "idle" -and $_.InstanceName -ne "_total"} | sort "IO Read MBytes/sec" -Descending
  • Thread Count
(Get-Counter "\Process(*)\Thread Count").CounterSamples | select InstanceName, @{label="Thread Count";expression={$_.CookedValue}} | where {$_.InstanceName -ne "idle" -and $_.InstanceName -ne "_total"} | sort "Thread Count" -Descending

Get-Counterで取得できるメトリクスは以下等で確認

Get-Counter -listset *
Get-Counter -ListSet process | Select-Object -ExpandProperty Paths

AWS Systems Manager でタスクのスケジューリングを置き換えれる

SSMとCloudWatch Eventで置き換えられそう。

Terraform の 以下を参考にする

AWS: aws_cloudwatch_event_target - Terraform by HashiCorp

resource "aws_iam_role" "ecs_events" {
  name = "ecs_events"
  assume_role_policy = <<DOC
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "events.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
DOC
}

resource "aws_iam_role_policy" "ecs_events_run_task_with_any_role" {
  name = "ecs_events_run_task_with_any_role"
  role = "${aws_iam_role.ecs_events.id}"
  policy = <<DOC
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": "ecs:RunTask",
            "Resource": "${replace(aws_ecs_task_definition.task_name.arn, "/:\\d+$/", ":*")}"
        }
    ]
}
DOC
}

resource "aws_cloudwatch_event_target" "ecs_scheduled_task" {
  target_id = "run-scheduled-task-every-hour"
  arn       = "${aws_ecs_cluster.cluster_name.arn}"
  rule      = "${aws_cloudwatch_event_rule.every_hour.name}"
  role_arn  = "${aws_iam_role.ecs_events.arn}"

  ecs_target = {
    task_count = 1
    task_definition_arn = "${aws_ecs_task_definition.task_name.arn}"
  }

  input = <<DOC
{
  "containerOverrides": [
    {
      "name": "name-of-container-to-override",
      "command": ["bin/console", "scheduled-task"]
    }
  ]
}
DOC
}

Relational DBMS Internals - Ch.2

最近読んでいるので、2章のまとめ。

http://pages.di.unipi.it/ghelli/bd2/DBMS-Internals.pdf

  • Permanent Memory And Buffer Management
    • DBMSを実装する上で最初に解決するべきは、システムを構成する複数のコンポーネントに依存せずストレージを抽象的に扱うことのできるレイヤの提供である
      • Permanent Memory ManagerとBuffer Managerによって実現される
    • 記憶領域の種類
      • Main memory
      • Permanent memory with magnetic disks
      • Temporary memory with NAND flash memory
    • Permanent Memory Manager
      • diskを抽象化するレイヤを提供する
      • 各DBを物理ファイル(物理ページ)の集合で表現する
      • DBとは実データやインデックスの物理ページを含んだディレクトリである
      • 各レコードは実ファイル上に構成される論理ファイルとなる
    • Buffer Manager
      • Permanent Memory のページをMain memoryと対応される役割
      • トランザクションが必要としているページを取得するのに責任を持つ
      • クエリのパフォーマンスはTemporary memory に読み込むページの数に依存する
      • 以下BufferManagerの構成要素
        • The buffer pool
          • Permanen memory のコピーやその保持情報を格納するArray
          • 固定サイズなので、適切なアルゴリズムにしたがってfreeして空き容量を確保する必要がある
          • Arrayの各要素は以下の変数を持つ (最初はすべての要素のpin countは0, dirtyはfalse)
            • the pin count(その要素が含むページが現在参照されている数)
            • dirty(そのページが、buffer poolに読み込まれて以降に更新があったか)
        • A hash resident pages table
          • permanent memeory pageがbuffer poolにのっているかとその対応するArrayの要素を管理する
      • Buffer Managerは以下のようなインタフェースを提供する
        • getAndPinPage(P)
          • P(ページ)がArrayの要素になっていれば、そのpin countをインクリメントする
            • Pのidを返す
          • PがArrayの要素になっていなければ
            • freeなArrayの要素(pin countは0)をgetする(LRUなどアルゴリズムはいくつかある).
              • ただしgetした要素がdirtyであればflushする(例えば物理ページにwrite outするなどして).
              • Arrayに空きがなければexception
            • getした要素にページを読み込んでpin countをインクリメントする(つまり1). dirtyはfalseとする.
            • BufferManagerはその要素のpin countが0にならないかぎり他のページをその要素にロードすることはない
            • resident table の情報をupdateする
              • 古いページに関する情報をdelete
              • 新しいページに関する情報をinsert
        • setDirty(P)
          • requesterがPを更新するときこれを呼ぶ. BufferManagerにArrayの対応する要素のdirtyをTrueにするよう指示する
        • unpinPage(P)
          • requesterがもはやPを必要としない時これを呼ぶ. BufferManagerはこのArray要素のpin countをデクリメントする
        • flushPage(P)
          • requesterがPをpermanent memoryに書き出したいときにこれを呼ぶ.
          • PがdirtyであったらBufferManagerはPをpermanent memoryに書く.
      • unpinしたりflushしたりするのはrequesterではなくTransaction and Recovery Managerによって行われる.
        • requesterはリクエストを出すのみ.

SQSのキューからひたすらreceiveするくん

60万件くらい入ってたSQSのキューからひたすらreceiveした人

10多重

package main

import ( 
    "sync"
    "fmt"
    "github.com/aws/aws-sdk-go/service/sqs"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    kingpin "gopkg.in/alecthomas/kingpin.v2"
)

var (
    profile       = kingpin.Flag("profile", "AWS credential(default profile if not specified)").Default("").String()
    region        = kingpin.Flag("region", "AWS region(ap-northeast-1 if not specified)").Default("ap-northeast-1").String()
)

func main() {
    kingpin.Parse() 

    // session for AWS
    sess := session.Must(session.NewSessionWithOptions(session.Options{Profile:*profile, Config:aws.Config{Region:region}}))
    
    // sqs client
    sqs_svc := sqs.New(sess)
    
    wg := &sync.WaitGroup{}
    receiveMessageInput := sqs.ReceiveMessageInput{}
    receiveMessageInput.SetMaxNumberOfMessages(10).SetQueueUrl("url")
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func () {
            for {
                receiveMessageOutput, _ := sqs_svc.ReceiveMessage(&receiveMessageInput)
                if len(receiveMessageOutput.Messages) == 0 { break }
                for _, mess := range receiveMessageOutput.Messages {
                    fmt.Println(*mess.Body)
                    deleteMessageInput := sqs.DeleteMessageInput{}
                    deleteMessageInput.SetReceiptHandle(*mess.ReceiptHandle).SetQueueUrl("url")
                    _, err := sqs_svc.DeleteMessage(&deleteMessageInput)
                    if err != nil {
                        panic(err)
                    }
                }
            }
            wg.Done()
        }()
    } 
    wg.Wait()
}