AWSのセキュリティログをAthenaで検索する

Posted on 2023/05/04

ToC

セキュリティログの活用

セキュリティインシデントの発生時の対応や各種証跡の保全のためにログを取得することは、これまでも一般的に行われてきました。 特にクラウドでは、セキュリティに関する各種のログが簡単に取得できる各種のサービスやツールが公開されており、企業や個人でも活用されている方は多いと思います。

その一方で、データ量も多く、万一のために取得しているという認識から、なかなか活用に至らないケースも多くあるのではないでしょうか。 昨今では、発見的統制(Detective Control)という言葉も出てきており、セキュリティの問題と潜在的な脅威を見つけ出すことも必要となっていると言われております。

問題の発見には、簡単にログを検索して集計する必要があるため、AWSのAthenaによる各種セキュリティログ検索を実施してみます。

検索対象のログ

AWSの機能を利用することで、ほぼ自動的にS3上に蓄積されるログの検索をAthenaで実施してみたいと思います。

内容はAWSのサービスログのクエリとして、すでに公開されているため、同じことを実施しても芸が無いので、 この内容をCloud Formationにて構成管理できる状態を作ることを試みました。

CloudTrailログ

特に変わった部分はありませんが、強いていうとすればAWS Glue Data Catalogを利用する際には、 Catalog IdにはAWSアカウントIDを入力するようです。また、Athenaパーティションを都度構築することは手間なので、 パーティション射影の機能を活用しました。

CloudTrailLogsTable:
Type: AWS::Glue::Table
Properties:
  CatalogId: !Ref "AWS::AccountId"
  DatabaseName: [Glueデータベース名]
  TableInput:
    Name: cloud_trail_logs
    TableType: EXTERNAL_TABLE
    Parameters:
      classification: cloudtrail
      compressionType: gzip
      typeOfData: file
      serialization.format: 1
      projection.enabled: true
      projection.account.type: enum
      projection.account.values: [検索対象のAWSアカウントID]
      projection.region.type: enum
      projection.region.values: us-west-2,af-south-1,ap-east-1,ap-south-1,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1,ca-central-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,me-south-1,sa-east-1
      projection.dt.type: date
      projection.dt.range: NOW-4MONTHS,NOW
      projection.dt.format: yyyy/MM/dd
      projection.dt.interval: 1
      projection.dt.interval.unit: DAYS
      storage.location.template: !Sub 's3://[バケット名]/AWSLogs/${!account}/CloudTrail/${!region}/${!dt}'
    StorageDescriptor:
      Location: !Sub 's3://[バケット名]/AWSLogs/'
      Columns:
        - Name: eventversion
          Type: string
        - Name: useridentity
          Type: "struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>>"
        - Name: eventtime
          Type: string
        - Name: eventtype
          Type: string
        - Name: eventsource
          Type: string
        - Name: eventname
          Type: string
        - Name: awsregion
          Type: string
        - Name: sourceipaddress
          Type: string
        - Name: useragent
          Type: string
        - Name: errorcode
          Type: string
        - Name: errormessage
          Type: string
        - Name: requestparameters
          Type: string
        - Name: responseelements
          Type: string
        - Name: additionaleventdata
          Type: string
        - Name: requestid
          Type: string
        - Name: eventid
          Type: string
        - Name: resources
          Type: "array<struct<arn:string,accountid:string,type:string>>"
        - Name: apiversion
          Type: string
        - Name: readonly
          Type: string
        - Name: recipientaccountid
          Type: string
        - Name: serviceeventdetails
          Type: string
        - Name: sharedeventid
          Type: string
        - Name: vpcendpointid
          Type: string
      SerdeInfo:
        SerializationLibrary: com.amazon.emr.hive.serde.CloudTrailSerde
        Parameters:
          serialization.format: 1
      InputFormat:  com.amazon.emr.cloudtrail.CloudTrailInputFormat
      OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      StoredAsSubDirectories: true
    PartitionKeys:
      - Name: account
        Type: string
      - Name: region
        Type: string
      - Name: dt
        Type: string

CloudFrontアクセスログ

こちらもCloudTrailログとほぼ同様ですが、ちょっとしたテクニックを活用しています。 storage.location.templateの箇所は、パーティションを${パーティション名}の形式で設定する必要がありますが、 同時に!Subでバケット名を置換することも多いと思います。そういった際にCloudformationでは、!を付けてつけることで エスケープされて置換対象となりません。

CloudFrontAccessLogsTable:
Type: AWS::Glue::Table
Properties:
  CatalogId: !Ref "AWS::AccountId"
  DatabaseName: [Glueデータベース名]
  TableInput:
    Name: cloud_front_access_logs
    TableType: EXTERNAL_TABLE
    Parameters:
      compressionType: gzip
      typeOfData: file
      serialization.format: 1
      projection.enabled: true
      skip.header.line.count: 2
      projection.distribution.type: injected
      storage.location.template: !Sub 's3://[バケット名]/AWSLogs/cloudfront/${!distribution}'
    StorageDescriptor:
      Location: !Sub 's3://[バケット名]/AWSLogs/cloudfront/'
      Columns:
        - Name: request_date
          Type: date
        - Name: time
          Type: string
        - Name: location
          Type: string
        - Name: bytes
          Type: bigint
        - Name: request_ip
          Type: string
        - Name: method
          Type: string
        - Name: host
          Type: string
        - Name: uri
          Type: string
        - Name: status
          Type: int
        - Name: referrer
          Type: string
        - Name: user_agent
          Type: string
        - Name: query_string
          Type: string
        - Name: cookie
          Type: string
        - Name: result_type
          Type: string
        - Name: request_id
          Type: string
        - Name: host_header
          Type: string
        - Name: request_protocol
          Type: string
        - Name: request_bytes
          Type: bigint
        - Name: time_taken
          Type: float
        - Name: xforwarded_for
          Type: string
        - Name: ssl_protocol
          Type: string
        - Name: ssl_cipher
          Type: string
        - Name: response_result_type
          Type: string
        - Name: http_version
          Type: string
        - Name: fle_status
          Type: string
        - Name: fle_encrypted_fields
          Type: int
        - Name: c_port
          Type: int
        - Name: time_to_first_byte
          Type: float
        - Name: x_edge_detailed_result_type
          Type: string
        - Name: sc_content_type
          Type: string
        - Name: sc_content_len
          Type: bigint
        - Name: sc_range_start
          Type: bigint
        - Name: sc_range_end
          Type: bigint
      SerdeInfo:
        SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
        Parameters:
          serialization.format: "\t"
          field.delim: "\t"
      InputFormat:  org.apache.hadoop.mapred.TextInputFormat
      OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      StoredAsSubDirectories: true
    PartitionKeys:
      - Name: distribution
        Type: string

S3アクセスログ

AWSのサイトにはありませんでしたが作成してみました。こちらは、パーティションの作成で工夫しています。 S3バケットは、アカウント数やサービス数が増えてくると事前に定義することが困難なことも多くあると思います。
そうした際には、projection.bucket.typeのところでInjected型を利用することでクエリの実行時に挿入することが可能です。 ただし、クエリの実行時のWHERE句で対象のパーティションの条件を指定しなければ実行エラーとなるので注意してください。

S3AccessLogsTable:
Type: AWS::Glue::Table
Properties:
  CatalogId: !Ref "AWS::AccountId"
  DatabaseName: [Glueデータベース名]
  TableInput:
    Name: s3_access_logs
    TableType: EXTERNAL_TABLE
    Parameters:
      compressionType: gzip
      typeOfData: file
      serialization.format: 1
      projection.enabled: true
      projection.bucket.type: injected
      storage.location.template: !Sub 's3://[バケット名]/AWSLogs/s3/${!bucket}'
    StorageDescriptor:
      Location: !Sub 's3://[バケット名]/AWSLogs/s3/'
      Columns:
        - Name: bucketowner
          Type: string
        - Name: bucket_name
          Type: string
        - Name: requestdatetime
          Type: string
        - Name: remoteip
          Type: string
        - Name: requester
          Type: string
        - Name: requestid
          Type: string
        - Name: operation
          Type: string
        - Name: key
          Type: string
        - Name: request_uri
          Type: string
        - Name: httpstatus
          Type: string
        - Name: errorcode
          Type: string
        - Name: bytessent
          Type: bigint
        - Name: objectsize
          Type: bigint
        - Name: totaltime
          Type: string
        - Name: turnaroundtime
          Type: string
        - Name: referrer
          Type: string
        - Name: useragent
          Type: string
        - Name: versionid
          Type: string
        - Name: hostid
          Type: string
        - Name: sigv
          Type: string
        - Name: ciphersuite
          Type: string
        - Name: authtype
          Type: string
        - Name: endpoint
          Type: string
        - Name: tlsversion
          Type: string
      SerdeInfo:
        SerializationLibrary: org.apache.hadoop.hive.serde2.RegexSerDe
        Parameters:
          input.regex: "([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) ([^ ]*)(?: ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*))?.*$"
      InputFormat:  org.apache.hadoop.mapred.TextInputFormat
      OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
      StoredAsSubDirectories: true
    PartitionKeys:
      - Name: bucket
        Type: string

ちょっと困ったことが….

CloudFrontアクセスログやS3アクセスログは、日付要素がログファイル名に入っており、Athenaのパーティション設定に必要な ディレクトリの形式となっていないのでパーティション設定ができないようです。ログ量が増えたときは、 「すべてのログを走査してから検索するしかないのかな」と少し諦めかけていたのですが、どうやらAthenaには便利な機能があるようです。

SELECTクエリで "$path"を使用することで対象のレコードデータが入っているファイル名を参照することができるようです。 関数と組み合わせたりWHERE句と組み合わせたりと、活用方法は色々ありそうですね。

Enjoy….


参照