misc.tech.notes

主に技術的な雑記的な

PythonでLambda Functionを書く時にデコレータでイベントソース毎の共通処理をすると便利という話

この記事はServerless2 Advent Calendar 2018の21日目の記事です。

qiita.com

先日 AWS Lambda Custom Runtimes芸人 Advent Calendar 2018 の方はちょっとはっちゃけすぎた感もありつつ実戦にすぐに役に立つような内容ではなかったですが、今回はマジメにより実戦的な内容で行きたいと思いますw

marcy.hatenablog.com

はじめに

私は普段は主にPythonでLambda Functionを書いているのですが、イベントの処理方法を予め決めてデコレータで共通処理を行うようにしています。それによってServerlessな開発でよく話題になるトレーサビリティの問題やエラーハンドリングの煩雑さなどをある程度上手く解決できているので紹介したいと思います。

なお、出てくるコードは実際に使用しているものに近いですが、イメージしやすいようにあまり抽象化せずにその場で書いているものなので、動作保証はしませんのであしからず。

イベントの処理方法を予め決める

たとえばKinesis Streamsを処理するFunction、SNSからのPublishで動くFunction、API Gatewayから呼び出されるFunctionとでは受け取るイベントの形式が異なりますし、エラー時の挙動も異なります。そんな中で、ログをトレースするためのIDを仕込んだりをFunctionの中でやると冗長になりますし、エラー処理方法もマチマチになったりしがちです。それを上手くまとめるのにPythonだと特にDecoratorを使うのが都合が良いのです。

ロギング

全てのDecoratorに必ず入れるのが入ってきたEventと返り値をログに残す処理です。これはハイトラフィックなサービスでは必ずしも全て残すべきかは議論の余地がありますが、そうでなければ必ずやります。この際に、イベントの処理方法(Streams, SNS, API GWなど)毎に一連のログをトレースするためのIDを持たせる場所を決めておき、それが後で検索可能なようにログに吐かせます。

ログの出力先は一旦CloudWatch Logsに吐いて後で集約でも良いのですが、残念ながらCloudWatch Logsのログを別のKinesis Stream/Firehoseへ繋ぎこむSubscriptionの設定が非常に煩雑なので、私はLambdaから直接FirehoseかDatadog Logsに突っ込むことが多いです。オススメはDatadog Logsです。*1Pythonの場合はログ出力が非同期ではない*2のでこの方式はログが詰まれば処理も詰まるので良し悪しではありますが・・・

docs.aws.amazon.com

docs.datadoghq.com

Datadog Logsはこんな感じでTrace IDで串刺しで検索したりFunctionでフィルタしたりが簡単にできて便利。

f:id:FumblePerson:20181221180121p:plain

Datadog Logsにログを投げ込むためのライブラリも公開してるので良かったらどうぞ(本番導入済みのものです)

github.com

pypi.org

例: SNSで呼び出されるFunction

SNSでPublishされたイベントは SubjectMessage という2つの要素を持っています。例えばこのうちの Subject をTraced IDのようなメタデータを入れる場所とし、 MessageJSONやProtocolBufferで実際のイベントペイロードを表現するといった形です。最近はgRPCじゃなくてもProtocolBufferを使うとイベントの定義が発行側と購読側で共有できるので良いなーと思ったりしてます。直近のケースでは性能要件がかなり高く、PythonだとProtocolBufferの速いデシリアライザが見つからず、ujsonが未だに圧倒的な速さを見せつけてきたのでそっちにしちゃいましたがw*3

pypi.org

def sns_handler(func):
    @functools.wraps(func)
    def wrapper(event, context):
        ret = []
        for record in event['Records']:
            logger = get_logger()
            logger.set_trace_id(record['Sns']['Subject'])
            message = json.loads(record['Sns']['Message'])
            logger.info(message)
            try:
                result = func(message, context)
            except Exception as e:
                logger.exception(str(e))
                raise e
            logger.info(result)
            ret.append(result)
        return ret
    return wrapper

@sns_handler
def lambda_handler(message, context):
     pass

エラー処理

原則

エラー処理は原則AWS側が提供しているエラー処理機構に極力乗ります。非同期実行のLambdaでは3回までリトライし、それでもエラーになればDead Letter Queueの設定がなされていればそちらに流れます。*4コントロールするのはLambdaをFailさせて失敗をさせる条件です。未処理例外が発生すればFailするので、「基本的にはFailさせたいなら誰も処理しない例外を投げる」をベースにしています。とはいえ、ログにすら残らないのは問題なので、上記のSNSの例のようにDecorator内でそこは最低限カバーします。

try:
    result = func(message, context)
except Exception as e:
    logger.exception(str(e))
    raise e

API Gatewayのような同期実行のものは素直に500などを返すようにします。

例: API Gatewayから呼び出されるFunction

def api_handler(func):
    @functools.wraps(func)
    def wrapper(event, context):
        trace_id = event['headers'].get('X-Example-Trace-Id')
        logger = get_logger()
        logger.set_trace_id(trace_id)
        logger.info(event)
        try:
            ret = func(event, context)
            if 'headers' not in ret:
                ret['headers'] = {}
            ret['headers']['X-Example-Trace-Id'] = trace_id
        except Exception as e:
            logger.exception(str(e))
            ret = {
                'statusCode': 500,
                'headers': {'X-Example-Trace-Id': trace_id},
                'body': json.dumps({'result': str(e)})
            }
        return ret
    return wrapper

Kinesis Streamsのようなエラー時の挙動が特殊なものはそのイベントの処理方法を含め検討します。

例: Kinesis Streamsを処理するFunction

Kinesis StreamsではBatch Sizeで指定した最大数までのデータが一回の処理でまとめてFunctionに渡されてきます。ここで途中でLambdaをFailさせてしまうとその時に取得した全てのデータが再処理の対象となってしまい、これは都合の悪いことがあります。なので、実処理を行う関数にはBatchで取得したレコードを1件ずつ処理させ、順序保証したいもの以外の再処理可能なエラーはそのデータだけ再度Streamに入れ直すというようなルールで実装することがあります。

もちろんこれは要件次第です。SQSトリガーがない時にはKinesis Streamsをキュー代わりに使うケースも多かったので、SQSトリガーがある今は順序保証が必要なければSQSトリガー使ったほうが自然に実装できます。ただ、SQSトリガーもLambdaがFailすると一回のBatchで取得したデータが全てキューに戻るのでそこは注意が必要です。

def stream_handler(func):
    @functools.wraps(func)
    def wrapper(event, context):
        ret = []
        for record in event['Records']:
            payload = json.loads(base64.b64decode(record['kinesis']['data']))
            logger = get_logger()
            logger.set_trace_id(payload['trace_id'])
            logger.info(payload)
            try:
                result = func(payload, context)
            except Exception as e:
                logger.exception(str(e))
                client.put_record(
                    StreamName=payload['stream_name'],
                    Data=json.dumps(payload),
                    PartitionKey=payload['trace_id'])
            logger.info(result)
            ret.append(result)
        return ret
    return wrapper

@stream_handler
def lambda_handler(payload, context):
     pass

最後に

いかがでしょうか。Decoratorを使うとここまでに書いたメリット以外にも、Functionの実処理がシンプルになって見通しも良くなる、Decoratorを見るとそのFunctionが何からのイベントを処理するかパッと見で分かる、といったメリットもあります。

ごく簡単なもの以外は、PythonでLambda Functionを書くときにはFunctionを書く前にイベントの処理方法を決めてDecoratorから書き始めるとLambda Functionの開発がより良いものになるかもしれないので、是非やってみてください。

*1:一旦CloudWatch Logsに吐いた場合もDatadog Logsに流すLambda Functionが公式に公開されています

*2:非同期に実装する方法もありますが素直に書くとという話

*3:そんなに性能要件高いならGoで書けよというのはホントその通りなんですが、言語の縛りのある案件だったんですよ・・・

*4:ちなみにここだけの話ですが、Lambda自体の起動失敗はぶっちゃけちょいちょい起こります。ですが、大半はリトライで上手く行くケースがほとんどで、万が一取りこぼした時のためにDLQは設定しておくことを強く推奨します。