Microservices Get Started その1

このエントリに書くこと

このエントリは以下のようなことについて自分が最近まなんだ範囲で書く。

自分の環境は以下:

- macOS 10.13.4
- cpython 3.6.1
- nameko 2.9.1
- Docker for Mac Version 18.06.0-ce-mac70


introduction

Microservicesというワードはよく聞くし、いくつかブログや本も齧ったりしたけど、 全く自分で構築したこともないということで、いつか仕事で役立つ時がくるかもしれないしローカル環境で作ってみようと思う。

NamekoというPython製のフレームワークを使う。
可愛らしい名前だが、初心者にやさしいチュートリアル的なものが一切なく、結構使い始めるのに困るので、そういった意味でもこのエントリを書いている。 本エントリはマジでhello worldしかしてない。

GitHub - nameko/nameko: Python framework for building microservices


MicroservicesとSOA

Microservicesを始めようとする時、絶対気になるのが、 SOA(Service oriented architecture)との違いかと思われる。自分もいまだに気になる。

SOAは、S/Wエンティティ同士が細かく分割されていて、それぞれがメッセージのやりとり(SOAPとかHTTP)でコミュニケーションする。

MicroservicesはこのSOAのアドバンスドというか延長線上にいる。

ただし両者は、目的が大きく異なる。 誤解をおそれない感じで書いてみる。

SOAについて

巨大な業務システムを構築する際に、さまざまなステークホルダがいて、 それらが独自に実装したサブシステム同士が通信することを考える。
取引処理と決済処理と在庫処理が互いにデータを参照しあう時、取引システムと決済システムと在庫システムが全く別の組織が開発をしていても、 共通のインタフェースがあると、それぞれの実装はブラックボックスでもよくて、実装言語はなんでもいいしプラットフォームもそれぞれが得意なものを選択できる。 開発体制を分断できる。
ブラックボックスの先が超巨大で複雑なインフラでも、超スパゲティコードでもSOASOAである。
こういうこともあり、SOAは比較的S/Wの内部的なアーキテクチャに関心がない。

Microservicesについて

こちらはもっと開発品質の向上にフォーカスしている。 デリバリの効率をあげ、S/Wの開発そのものの改善を目指すもの。 したがって必然的に、S/Wの内部的なアーキテクチャに強い関心がある。
個々のサービスは、自律的かつ非依存的で、管理可能なほど小さな単位で完結しており、デプロイが容易でなければならない。 SOAPみたいなXMLのやりとりではなく,RESTやRPCを用いてコミュニケーションする。
サービス間で同じDBを参照するとかコードをシェアするとかは基本的になしである。 このように、非常に小さなサイズのサービスを包括的に管理する必要があるという意味で、 モノリシックなシステムと比較して運用がすごく難しい。
Microservicesの文脈ではコンテナが前提となっているが、運用の困難さを解決すべくOSSがいくつも出ている。


Nameko

さて、PythonでMicroservicesを書く時、Namekoというフレームワークでかなり簡単に始められる。 ちなみに、なめこが群生している様が、小さなサービスが群れているMicroservicesとなんか似ているのが由来ということらしい。 最初に触るにはよさそうである。 ただ、ドキュメントにチュートリアル的な初心者にやさしいものがないので、少し困る。

RPCについて

MicroservicesではRPCでサービス間のコミュニケーションをすることが必要となる。 これは、ネットワーク越しに存在するコードを、見かけ上はローカルのものを呼ぶかのように実行する。 RESTなどと比較して難しいのが、自分が書いているコードで、気をつけないと意図せずリモートとの通信を連発する可能性があること。 外部のリソースを使うのは高くつくものであるので、リモートコールする箇所は意識しておく必要がある。

Message Queuingについて

Namekoでは、RPCによるリモートコールを実行すると、Message Queueにリクエストを詰める。 呼び出し先のサービスでは、このキューからメッセージをconsumeしてタスクを実行する。 consumerは水平スケールすることでパフォーマンスを確保することが容易。

RabbitMQについて

メッセージのブローカとして、NamekoではRabbitMQが利用できる。 自分はDocker for Macでお手軽にローカルにRabbigMQを立てることにする。


Namekoを使って最初のMicroservices

初めてやるので、普段の環境とは分けておいた方が無難だろう。 てきとうに検証用のディレクトリを掘ってそこに移動する。

$ mkdir first_nameko; cd first_nameko

専用の仮想環境を作っておく。 (自分はcpython 3.6.1 を使った)

$ pyenv-virtualenv 3.6.1 first_nameko
$ pyenv local first_nameko

Namekoをインストールする

$ pip install nameko

まずは最初の HelloWold サービスを作ってみる。 公式ドキュメントから借用して、 helloworld.py を以下の内容で作成する。

from nameko.rpc import rpc

class GreetingService:
    name = "greeting_service"

    @rpc
    def hello(self, name):
        return "Hello, {}!".format(name)

サービス名は、 greeting_service である
@rpc デコレータによって、このメソッドがRPC経由で呼び出されるようにする。
次に、以下の config.yaml を同じ階層に置く。

AMQP_URI: 'pyamqp://guest:guest@localhost'

AMQP とは、NamekoがRPCをやりとりする上で利用するプロトコルらしく、今回はメッセージのブローカとしてRabbitMQを使うので、そのエンドポイントを記述。 RabbitMQにはデフォルトで guest:guest なユーザがいる。

次に、RabbitMQ自体はDockerでさくっと立てる。

$ docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq

そして、初めてのサービスを起動する。

$ nameko run helloworld --config config.yaml
starting services: greeting_service
Connected to amqp://guest:**@127.0.0.1:5672//

Namekoには cliが付属しているので、使ってみる。

$ nameko shell
Nameko Python 3.6.1 (default, Sep 26 2017, 15:11:41)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.42.1)] shell on darwin
Broker: pyamqp://guest:guest@localhost
>>>

さきほど起動したサービスに定義したメソッドを呼び出してみる。

>>> n.rpc.greeting_service.hello("ujun")
'Hello, ujun!'


このあと

Namekoを使って簡単にMicroserviceを構築できそうな雰囲気があるので、どんどん使っていきたい。 nameko.web にHTTPハンドラを作るモジュールがあるので、RESTなエンドポイントを定義してWebアプリっぽいものも簡単に作れる。


つづく

TerraformでAWS CodeDeployのアプリケーションを作る

リリース時に手で展開しているバッチを、いい加減なんらかのデプロイツールでやりたい。

できる限りTerraformで環境構築したいというのもある。

参考: https://www.terraform.io/docs/providers/aws/r/codedeploy_app.html#

素の状態から簡単にアプリケーション作るなら以下のような感じ。

# アプリケーション名
resource "aws_codedeploy_app" "sample-app" {
  name = "sample"
}

# リリース時に参照するconfiguration
# 設定項目はリリースの際にどの程度のホストが正常起動している必要があるかのみ
resource "aws_codedeploy_deployment_config" "sample-config" {
  deployment_config_name = "sample-config"

  minimum_healthy_hosts {
    type  = "FLEET_PERCENT"
    value = 50
  }
}

# リリース対象のサーバを識別するための設定
# とりあえず `"application":"sample"` というタグがついたEC2インスタンスを対象としている
resource "aws_codedeploy_deployment_group" "sample-group" {
  app_name               = "${aws_codedeploy_app.sample-app.name}"
  deployment_group_name  = "sample-group"
  service_role_arn       = "${aws_iam_role.sample_codedeploy.arn}"
  deployment_config_name = "${aws_codedeploy_deployment_config.sample-config.id}"

  ec2_tag_set {
    ec2_tag_filter {
      type  = "KEY_AND_VALUE"
      key   = "application"
      value = "sample"
    }
  }
}

# 必要なIAMロールの設定
resource "aws_iam_role" "sample_codedeploy" {
  name = "sample_codedeploy"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "codedeploy.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "AWSCodeDeployRole" {
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSCodeDeployRole"
  role       = "${aws_iam_role.sample_codedeploy.name}"
}

PowerShell で基本的なコンピュータリソースのメトリクスをとる(プロセスごと)

Windows Server 2016の検証を行なっていてもろもろとるために調べた。

(実行前に Get-Counter -ListSet process | Select-Object -ExpandProperty Paths をやらないとエラーとなることがある 参考: 同一PSで、「Get-Counter '\Process(*)\% Processor Time'」 を複数回実行するとエラーが出る )

  • CPU使用率
(Get-Counter "\Process(*)\% Processor Time").CounterSamples | select InstanceName, @{label="CPU %";expression={$_.CookedValue / $Env:NUMBER_OF_PROCESSORS -as [int]}} | where {$_.InstanceName -ne "idle" -and $_.InstanceName -ne "_total"} | sort "CPU %" -Descending
  • Memory使用率
(Get-Counter "\Process(*)\working set - private").CounterSamples | select InstanceName, @{label="memory MBytes";expression={$_.CookedValue/1024/1024}}, @{label="memory %";expression={$_.CookedValue / (Get-WmiObject win32_computersystem).totalphysicalmemory }} | where {$_.InstanceName -ne "idle" -and $_.InstanceName -ne "_total"} | sort "memory %" -Descending
  • IO Write Bytes/sec
(Get-Counter "\Process(*)\IO Write Bytes/sec").CounterSamples | select InstanceName, @{label="IO Write MBytes/sec";expression={$_.CookedValue}} | where {$_.InstanceName -ne "idle" -and $_.InstanceName -ne "_total"} | sort "IO Write MBytes/sec" -Descending
  • IO Read Bytes/sec
(Get-Counter "\Process(*)\IO Read Bytes/sec").CounterSamples | select InstanceName, @{label="IO Read MBytes/sec";expression={$_.CookedValue}} | where {$_.InstanceName -ne "idle" -and $_.InstanceName -ne "_total"} | sort "IO Read MBytes/sec" -Descending
  • Thread Count
(Get-Counter "\Process(*)\Thread Count").CounterSamples | select InstanceName, @{label="Thread Count";expression={$_.CookedValue}} | where {$_.InstanceName -ne "idle" -and $_.InstanceName -ne "_total"} | sort "Thread Count" -Descending

Get-Counterで取得できるメトリクスは以下等で確認

Get-Counter -listset *
Get-Counter -ListSet process | Select-Object -ExpandProperty Paths

AWS Systems Manager でタスクのスケジューリングを置き換えれる

SSMとCloudWatch Eventで置き換えられそう。

Terraform の 以下を参考にする

AWS: aws_cloudwatch_event_target - Terraform by HashiCorp

resource "aws_iam_role" "ecs_events" {
  name = "ecs_events"
  assume_role_policy = <<DOC
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "events.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
DOC
}

resource "aws_iam_role_policy" "ecs_events_run_task_with_any_role" {
  name = "ecs_events_run_task_with_any_role"
  role = "${aws_iam_role.ecs_events.id}"
  policy = <<DOC
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": "ecs:RunTask",
            "Resource": "${replace(aws_ecs_task_definition.task_name.arn, "/:\\d+$/", ":*")}"
        }
    ]
}
DOC
}

resource "aws_cloudwatch_event_target" "ecs_scheduled_task" {
  target_id = "run-scheduled-task-every-hour"
  arn       = "${aws_ecs_cluster.cluster_name.arn}"
  rule      = "${aws_cloudwatch_event_rule.every_hour.name}"
  role_arn  = "${aws_iam_role.ecs_events.arn}"

  ecs_target = {
    task_count = 1
    task_definition_arn = "${aws_ecs_task_definition.task_name.arn}"
  }

  input = <<DOC
{
  "containerOverrides": [
    {
      "name": "name-of-container-to-override",
      "command": ["bin/console", "scheduled-task"]
    }
  ]
}
DOC
}

Relational DBMS Internals - Ch.2

最近読んでいるので、2章のまとめ。

http://pages.di.unipi.it/ghelli/bd2/DBMS-Internals.pdf

  • Permanent Memory And Buffer Management
    • DBMSを実装する上で最初に解決するべきは、システムを構成する複数のコンポーネントに依存せずストレージを抽象的に扱うことのできるレイヤの提供である
      • Permanent Memory ManagerとBuffer Managerによって実現される
    • 記憶領域の種類
      • Main memory
      • Permanent memory with magnetic disks
      • Temporary memory with NAND flash memory
    • Permanent Memory Manager
      • diskを抽象化するレイヤを提供する
      • 各DBを物理ファイル(物理ページ)の集合で表現する
      • DBとは実データやインデックスの物理ページを含んだディレクトリである
      • 各レコードは実ファイル上に構成される論理ファイルとなる
    • Buffer Manager
      • Permanent Memory のページをMain memoryと対応される役割
      • トランザクションが必要としているページを取得するのに責任を持つ
      • クエリのパフォーマンスはTemporary memory に読み込むページの数に依存する
      • 以下BufferManagerの構成要素
        • The buffer pool
          • Permanen memory のコピーやその保持情報を格納するArray
          • 固定サイズなので、適切なアルゴリズムにしたがってfreeして空き容量を確保する必要がある
          • Arrayの各要素は以下の変数を持つ (最初はすべての要素のpin countは0, dirtyはfalse)
            • the pin count(その要素が含むページが現在参照されている数)
            • dirty(そのページが、buffer poolに読み込まれて以降に更新があったか)
        • A hash resident pages table
          • permanent memeory pageがbuffer poolにのっているかとその対応するArrayの要素を管理する
      • Buffer Managerは以下のようなインタフェースを提供する
        • getAndPinPage(P)
          • P(ページ)がArrayの要素になっていれば、そのpin countをインクリメントする
            • Pのidを返す
          • PがArrayの要素になっていなければ
            • freeなArrayの要素(pin countは0)をgetする(LRUなどアルゴリズムはいくつかある).
              • ただしgetした要素がdirtyであればflushする(例えば物理ページにwrite outするなどして).
              • Arrayに空きがなければexception
            • getした要素にページを読み込んでpin countをインクリメントする(つまり1). dirtyはfalseとする.
            • BufferManagerはその要素のpin countが0にならないかぎり他のページをその要素にロードすることはない
            • resident table の情報をupdateする
              • 古いページに関する情報をdelete
              • 新しいページに関する情報をinsert
        • setDirty(P)
          • requesterがPを更新するときこれを呼ぶ. BufferManagerにArrayの対応する要素のdirtyをTrueにするよう指示する
        • unpinPage(P)
          • requesterがもはやPを必要としない時これを呼ぶ. BufferManagerはこのArray要素のpin countをデクリメントする
        • flushPage(P)
          • requesterがPをpermanent memoryに書き出したいときにこれを呼ぶ.
          • PがdirtyであったらBufferManagerはPをpermanent memoryに書く.
      • unpinしたりflushしたりするのはrequesterではなくTransaction and Recovery Managerによって行われる.
        • requesterはリクエストを出すのみ.

SQSのキューからひたすらreceiveするくん

60万件くらい入ってたSQSのキューからひたすらreceiveした人

10多重

package main

import ( 
    "sync"
    "fmt"
    "github.com/aws/aws-sdk-go/service/sqs"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    kingpin "gopkg.in/alecthomas/kingpin.v2"
)

var (
    profile       = kingpin.Flag("profile", "AWS credential(default profile if not specified)").Default("").String()
    region        = kingpin.Flag("region", "AWS region(ap-northeast-1 if not specified)").Default("ap-northeast-1").String()
)

func main() {
    kingpin.Parse() 

    // session for AWS
    sess := session.Must(session.NewSessionWithOptions(session.Options{Profile:*profile, Config:aws.Config{Region:region}}))
    
    // sqs client
    sqs_svc := sqs.New(sess)
    
    wg := &sync.WaitGroup{}
    receiveMessageInput := sqs.ReceiveMessageInput{}
    receiveMessageInput.SetMaxNumberOfMessages(10).SetQueueUrl("url")
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func () {
            for {
                receiveMessageOutput, _ := sqs_svc.ReceiveMessage(&receiveMessageInput)
                if len(receiveMessageOutput.Messages) == 0 { break }
                for _, mess := range receiveMessageOutput.Messages {
                    fmt.Println(*mess.Body)
                    deleteMessageInput := sqs.DeleteMessageInput{}
                    deleteMessageInput.SetReceiptHandle(*mess.ReceiptHandle).SetQueueUrl("url")
                    _, err := sqs_svc.DeleteMessage(&deleteMessageInput)
                    if err != nil {
                        panic(err)
                    }
                }
            }
            wg.Done()
        }()
    } 
    wg.Wait()
}    
 

packer buildしてできたamiだけ取得(packerの出力変わると使えない)

#!/usr/bin/env python

import subprocess
import re

def run(cmd):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout = []
    while True:
        line = p.stdout.readline()
        stdout.append(line)
        if not line and p.poll() is not None:
            for text in stdout:
                match = re.search('ap-northeast-1:\s{1}(.*)', str(text.decode("utf-8")))
                if match:
                    print(match.group(1), end = "")
            break
    return ''.join(str(stdout))

if __name__ == '__main__':
    run("packer build -var-file=var.json template.json") 

プレースホルダを含んだtfを使うなど

python exec_packer.py | xargs -I{} sed -e 's/{{AMI_ID}}/{}/g' test.tf > test_e.tf