セキュリティインシデントが発生した場合、事前に作成したワークフローやアクションプランに基づいて動くことで対応をスムーズに行うことができます。
インシデントにおいて、このワークフローやアクションプランのような対応手順書を「プレイブック」と呼ぶことが多く、インシデント発生時にプレイブックの実行を自動化することで、対応チームの負荷を下げることができます。

コンテナのセキュリティ監視ツールである falco を利用したインシデントレスポンス自動化ツールとして kubernetes-response-engine というものがあります。

GitHub - falcosecurity-retire/kubernetes-response-engine
Contribute to falcosecurity-retire/kubernetes-response-engine development by cre...

これは Falco で検知したセキュリティイベントをメッセージブローカーである NATS から配信し、Kubernertes Serverless Framework である kubeless でそのイベントを受信、対応したプレイブックを実行するものです。
これによって、例えば次のような対応を自動化することができます。

今回はこの仕組をパクリ真似して、インシデントレスポンスプレイブックの自動化をやってみます。

今回のプレイブックの内容

ここでは、falco の kubernetes-response-engine と同じような仕組みを NATS の代わりに Apache Kafka を利用して作りたいと思います。
なぜ Kafka かというと、お家クラスタに Kafka がいるのと、Kubeless が Kafka に対応しているからです。

今回は実行するプレイブックとして、「Pod で bash プロセスが起動した場合に、sysdig でその Pod のイベントをキャプチャして保存する」という内容にしました。
セキュリティイベント自体は falco でもモニタリングできていますが、全てのログを記録したいとなると、このようなアプローチになると思います。

ざっくりと図にすると次のようになります。

以上がざっくりとした今回作った内容です。

kubeless に function を登録する

今回作った function は次の通りです。PoC なので雑なのはご了承ください…

import os
from kubernetes import client, config

class KubernetesClient(object):
    def __init__(self):
        config.load_incluster_config()
        self.v1 = client.CoreV1Api()
        self.batch_v1 = client.BatchV1Api()

    def find_node_running_pod(self, name, namespace):
        response = self.v1.read_namespaced_pod(name, namespace)
        return response.spec.node_name

    def start_sysdig_capture_for(self, pod_name, namespace, event_time, duration_in_seconds):
        job_name = 'sysdig-{}-{}'.format(pod_name, event_time)
        node_name = self.find_node_running_pod(pod_name, namespace)

        body = self._build_sysdig_capture_job_body(job_name, node_name, duration_in_seconds)

        return self.batch_v1.create_namespaced_job(namespace, body)

    def _build_sysdig_capture_job_body(self, job_name, node_name, duration_in_seconds):
        return client.V1Job(
            metadata=client.V1ObjectMeta(
                name=job_name
            ),
            spec=client.V1JobSpec(
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(
                        name=job_name
                    ),
                    spec=client.V1PodSpec(
                        containers=[client.V1Container(
                            name='capturer',
                            image='sysdig/sysdig',
                            command=["/bin/bash", "-c", "sysdig -S -M {} -pk -z -w /capture.scap.gz; cp /capture.scap.gz /host/tmp/".format(duration_in_seconds)],
                            image_pull_policy='Always',
                            security_context=client.V1SecurityContext(
                                privileged=True
                            ),
                            volume_mounts=[
                                client.V1VolumeMount(
                                    mount_path='/host/tmp',
                                    name='host-tmp'
                                ),
                                client.V1VolumeMount(
                                    mount_path='/host/var/run/docker.sock',
                                    name='docker-socket'
                                ),
                                client.V1VolumeMount(
                                    mount_path='/host/dev',
                                    name='dev-fs'
                                ),
                                client.V1VolumeMount(
                                    mount_path='/host/proc',
                                    name='proc-fs',
                                    read_only=True
                                ),
                                client.V1VolumeMount(
                                    mount_path='/host/boot',
                                    name='boot-fs',
                                    read_only=True
                                ),
                                client.V1VolumeMount(
                                    mount_path='/host/lib/modules',
                                    name='lib-modules',
                                    read_only=True
                                ),
                                client.V1VolumeMount(
                                    mount_path='/host/usr',
                                    name='usr-fs',
                                    read_only=True
                                ),
                                client.V1VolumeMount(
                                    mount_path='/dev/shm',
                                    name='dshm'
                                )
                            ]
                        )],
                        volumes=[
                            client.V1Volume(
                                name='dshm',
                                empty_dir=client.V1EmptyDirVolumeSource(
                                    medium='Memory'
                                )
                            ),
                            client.V1Volume(
                                name='host-tmp',
                                host_path=client.V1HostPathVolumeSource(
                                    path='/tmp'
                                )
                            ),
                            client.V1Volume(
                                name='docker-socket',
                                host_path=client.V1HostPathVolumeSource(
                                    path='/var/run/docker.sock'
                                )
                            ),
                            client.V1Volume(
                                name='dev-fs',
                                host_path=client.V1HostPathVolumeSource(

                                    path='/dev'
                                )
                            ),
                            client.V1Volume(
                                name='proc-fs',
                                host_path=client.V1HostPathVolumeSource(
                                    path='/proc'
                                )
                            ),

                            client.V1Volume(
                                name='boot-fs',
                                host_path=client.V1HostPathVolumeSource(
                                    path='/boot'
                                )
                            ),
                            client.V1Volume(
                                name='lib-modules',
                                host_path=client.V1HostPathVolumeSource(
                                    path='/lib/modules'
                                )
                            ),
                            client.V1Volume(
                                name='usr-fs',
                                host_path=client.V1HostPathVolumeSource(
                                    path='/usr'
                                )
                            )
                        ],
                        node_name=node_name,
                        restart_policy='Never'
                    )
                )
            )
        )

def handler(event, context):
    duration = 60
    print(event['data'])
    if event['data']['output_fields']['proc.cmdline'] == 'bash':
        print("Bash detected")
        client = KubernetesClient()
        pod_name = event['data']['output_fields']['k8s.pod.name']
        event_time = event['data']['output_fields']['evt.time']
        namespace = event['data']['output_fields']['k8s.ns.name']
        client.start_sysdig_capture_for(
                pod_name, namespace, event_time, duration
                )
        print("Capture Started.")

    return event['data']

コード量が多く見えますが、Falco のアラートから bash プロセスの場合に、sysdig Pod を作る Job を作成して、結果をノードに保存しているだけです。
実際にプレイブックとして活用するには、Kubernetes Client の作成などをライブラリとして切り出すことで、見栄えがよくなると思います。今回は PoC なので…。
sysdig で取得した内容は、実際には S3 にアップロードするなどの内容に変更すると良いと思います。今回は面倒だったのでノードに保存しています。

Kubernetes API を叩いて Job を作成するので、ServiceAccount が必要になりますので、用意します。

apiVersion: v1
kind: ServiceAccount
metadata:
  name: run-sysdig-job
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: run-sysdig-job
rules:
  - apiGroups: ["batch", "extensions"]
    resources: ["jobs", "job/status"]
    verbs: ["*"]
  - apiGroups: [""]
    resources: ["pods", "pods/binding", "pods/log", "pods/status"]
    verbs: ["get", "list", "create"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: run-sysdig-job
roleRef:
  kind: ClusterRole
  name: run-sysdig-job
  apiGroup: rbac.authorization.k8s.io
subjects:
  - kind: ServiceAccount
    name: run-sysdig-job
    namespace: kubeless

ServiceAccount と RBAC を作成したら kubeless に先程の function を登録します。

$ kubeless function deploy sysdig --runtime python3.8 --from-file sysdig.py \
                --handler sysdig.handler --dependencies requirements.txt \
                --service-account run-sysdig-job

kubeless function ls で function が登録されていれば OK です。

❯ kubeless function ls
NAME    NAMESPACE       HANDLER         RUNTIME         DEPENDENCIES                    STATUS
sysdig  kubeless        sysdig.handler  python3.8       cachetools==4.2.1               1/1 READY
                                                        certifi==2020.12.5
                                                        chardet==4.0.0
                                                        google-auth==1.27.0
                                                        idna==2.10
                                                        kubernetes==12.0.1
                                                        oauthlib==3.1.0
                                                        pyasn1==0.4.8
                                                        pyasn1-modules==0.2.8
                                                        python-dateutil==2.8.1
                                                        PyYAML==5.4.1
                                                        requests==2.25.1
                                                        requests-oauthlib==1.3.0
                                                        rsa==4.7.1
                                                        six==1.15.0
                                                        urllib3==1.26.3
                                                        websocket-client==0.57.0

続いて kafka からイベントを受信するためにトリガーを作成します。
kubeless では Kafka の topic とそれに対応する function を設定することで、PubSub として機能します。

$ kubeless trigger kafka create sysdig-capture \
         --function-selector created-by=kubeless,function=sysdig \
         --trigger-topic falco

こちらも登録を確認できば OK です。

❯ kubeless trigger kafka ls
NAME            NAMESPACE       TOPIC   FUNCTION SELECTOR
sysdig-capture  kubeless        falco   created-by=kubeless,function=sysdig

以上で準備は完了です。

テストする

では、プレイブックが自動で実行されるか確認してみます。
適当な Pod に attach して bash を実行します。

$ kubectl exec -it test -- bash

test# curl https://example.com

すると function を実行している Pod で成功したようなログが流れます。

sysdig-5fffdb665d-lpbk6 sysdig {'output': '06:26:51.301051801: Notice A shell was spawned in a container with an attached terminal (user=root user_loginuid=-1 k8s.ns=kubeless k8s.pod=test container=4d93e05ff69e shell=bash parent=runc cmdline=bash terminal=34816 container_id=4d93e05ff69e image=nicolaka/netshoot) k8s.ns=kubeless k8s.pod=test container=4d93e05ff69e k8s.ns=kubeless k8s.pod=test container=4d93e05ff69e', 'priority': 'Notice', 'rule': 'Terminal shell in container', 'time': '2021-02-20T06:26:51.301051801Z', 'output_fields': {'container.id': '4d93e05ff69e', 'container.image.repository': 'nicolaka/netshoot', 'evt.time': 1613802411301051801, 'k8s.ns.name': 'kubeless', 'k8s.pod.name': 'test', 'proc.cmdline': 'bash', 'proc.name': 'bash', 'proc.pname': 'runc', 'proc.tty': 34816, 'user.loginuid': -1, 'user.name': 'root'}}
sysdig-5fffdb665d-lpbk6 sysdig Bash detected
sysdig-5fffdb665d-lpbk6 sysdig Capture Started.
...

ノードには sysdig のキャプチャファイルが保存され、意図した通りに実行されていることが確認できました。

❯ sysdig -r capture.scap | grep example.com
675325 15:27:06.135277896 0 curl (371056) < execve res=0 exe=curl args=https://example.com. tid=371056(curl) pid=371056(curl) ptid=370416(bash) cwd= fdlimit=1048576 pgft_maj=1 pgft_min=29 vm_size=988 vm_rss=4 vm_swap=0 comm=curl cgroups=cpuset=/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod8b99e... env=KUBERNETES_SERVICE_PORT_HTTPS=443.KUBERNETES_SERVICE_PORT=443.HOSTNAME=privil... tty=34816 pgid=122 loginuid=-1

注意点など

今回は sysdig の Pod を実行しましたが、その際にイメージの Pull と起動までに少し時間がかかりました(sysdig は初回起動時に Kernel モジュールをダウンロードする)。そのため、実際に攻撃を検知してから若干のタイムラグが生まれてしまいます。攻撃がスクリプト化されているなら、イベントが全く取れなかった…ということになると思います。
この対応として、イメージをキャッシュしておいたり、そもそも別のアプローチを取る、などが考えられます。

まとめ

簡単にですが、Falco, Kafka, Kubeless を使ったインシデントプレイブックの自動化を紹介しました。
プレイブックをコードとして残し、それを自動化することで、対応やメンテコストを下げつつ、素早く正確なインシデントレスポンスができると思います。
似たような仕組みや、「こういうこともできるよ!」という話も聞きたいので、もしいればコソッと教えて下さい。