ToC
セキュリティログの活用
セキュリティインシデントの発生時の対応や各種証跡の保全のためにログを取得することは、これまでも一般的に行われてきました。 特にクラウドでは、セキュリティに関する各種のログが簡単に取得できる各種のサービスやツールが公開されており、企業や個人でも活用されている方は多いと思います。
その一方で、データ量も多く、万一のために取得しているという認識から、なかなか活用に至らないケースも多くあるのではないでしょうか。 昨今では、発見的統制(Detective Control)という言葉も出てきており、セキュリティの問題と潜在的な脅威を見つけ出すことも必要となっていると言われております。
問題の発見には、簡単にログを検索して集計する必要があるため、AWSのAthenaによる各種セキュリティログ検索を実施してみます。
検索対象のログ
AWSの機能を利用することで、ほぼ自動的にS3上に蓄積されるログの検索をAthenaで実施してみたいと思います。
内容はAWSのサービスログのクエリとして、すでに公開されているため、同じことを実施しても芸が無いので、
この内容をCloud Formation
にて構成管理できる状態を作ることを試みました。
CloudTrailログ
特に変わった部分はありませんが、強いていうとすればAWS Glue Data Catalogを利用する際には、
Catalog Id
にはAWSアカウントIDを入力するようです。また、Athenaパーティションを都度構築することは手間なので、
パーティション射影の機能を活用しました。
CloudTrailLogsTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref "AWS::AccountId"
DatabaseName: [Glueデータベース名]
TableInput:
Name: cloud_trail_logs
TableType: EXTERNAL_TABLE
Parameters:
classification: cloudtrail
compressionType: gzip
typeOfData: file
serialization.format: 1
projection.enabled: true
projection.account.type: enum
projection.account.values: [検索対象のAWSアカウントID]
projection.region.type: enum
projection.region.values: us-west-2,af-south-1,ap-east-1,ap-south-1,ap-northeast-2,ap-southeast-1,ap-southeast-2,ap-northeast-1,ca-central-1,eu-central-1,eu-west-1,eu-west-2,eu-south-1,eu-west-3,eu-north-1,me-south-1,sa-east-1
projection.dt.type: date
projection.dt.range: NOW-4MONTHS,NOW
projection.dt.format: yyyy/MM/dd
projection.dt.interval: 1
projection.dt.interval.unit: DAYS
storage.location.template: !Sub 's3://[バケット名]/AWSLogs/${!account}/CloudTrail/${!region}/${!dt}'
StorageDescriptor:
Location: !Sub 's3://[バケット名]/AWSLogs/'
Columns:
- Name: eventversion
Type: string
- Name: useridentity
Type: "struct<type:string,principalid:string,arn:string,accountid:string,invokedby:string,accesskeyid:string,username:string,sessioncontext:struct<attributes:struct<mfaauthenticated:string,creationdate:string>,sessionissuer:struct<type:string,principalid:string,arn:string,accountid:string,username:string>>>"
- Name: eventtime
Type: string
- Name: eventtype
Type: string
- Name: eventsource
Type: string
- Name: eventname
Type: string
- Name: awsregion
Type: string
- Name: sourceipaddress
Type: string
- Name: useragent
Type: string
- Name: errorcode
Type: string
- Name: errormessage
Type: string
- Name: requestparameters
Type: string
- Name: responseelements
Type: string
- Name: additionaleventdata
Type: string
- Name: requestid
Type: string
- Name: eventid
Type: string
- Name: resources
Type: "array<struct<arn:string,accountid:string,type:string>>"
- Name: apiversion
Type: string
- Name: readonly
Type: string
- Name: recipientaccountid
Type: string
- Name: serviceeventdetails
Type: string
- Name: sharedeventid
Type: string
- Name: vpcendpointid
Type: string
SerdeInfo:
SerializationLibrary: com.amazon.emr.hive.serde.CloudTrailSerde
Parameters:
serialization.format: 1
InputFormat: com.amazon.emr.cloudtrail.CloudTrailInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
StoredAsSubDirectories: true
PartitionKeys:
- Name: account
Type: string
- Name: region
Type: string
- Name: dt
Type: string
CloudFrontアクセスログ
こちらもCloudTrailログとほぼ同様ですが、ちょっとしたテクニックを活用しています。
storage.location.template
の箇所は、パーティションを${パーティション名}
の形式で設定する必要がありますが、
同時に!Sub
でバケット名を置換することも多いと思います。そういった際にCloudformationでは、!
を付けてつけることで
エスケープされて置換対象となりません。
CloudFrontAccessLogsTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref "AWS::AccountId"
DatabaseName: [Glueデータベース名]
TableInput:
Name: cloud_front_access_logs
TableType: EXTERNAL_TABLE
Parameters:
compressionType: gzip
typeOfData: file
serialization.format: 1
projection.enabled: true
skip.header.line.count: 2
projection.distribution.type: injected
storage.location.template: !Sub 's3://[バケット名]/AWSLogs/cloudfront/${!distribution}'
StorageDescriptor:
Location: !Sub 's3://[バケット名]/AWSLogs/cloudfront/'
Columns:
- Name: request_date
Type: date
- Name: time
Type: string
- Name: location
Type: string
- Name: bytes
Type: bigint
- Name: request_ip
Type: string
- Name: method
Type: string
- Name: host
Type: string
- Name: uri
Type: string
- Name: status
Type: int
- Name: referrer
Type: string
- Name: user_agent
Type: string
- Name: query_string
Type: string
- Name: cookie
Type: string
- Name: result_type
Type: string
- Name: request_id
Type: string
- Name: host_header
Type: string
- Name: request_protocol
Type: string
- Name: request_bytes
Type: bigint
- Name: time_taken
Type: float
- Name: xforwarded_for
Type: string
- Name: ssl_protocol
Type: string
- Name: ssl_cipher
Type: string
- Name: response_result_type
Type: string
- Name: http_version
Type: string
- Name: fle_status
Type: string
- Name: fle_encrypted_fields
Type: int
- Name: c_port
Type: int
- Name: time_to_first_byte
Type: float
- Name: x_edge_detailed_result_type
Type: string
- Name: sc_content_type
Type: string
- Name: sc_content_len
Type: bigint
- Name: sc_range_start
Type: bigint
- Name: sc_range_end
Type: bigint
SerdeInfo:
SerializationLibrary: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Parameters:
serialization.format: "\t"
field.delim: "\t"
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
StoredAsSubDirectories: true
PartitionKeys:
- Name: distribution
Type: string
S3アクセスログ
AWSのサイトにはありませんでしたが作成してみました。こちらは、パーティションの作成で工夫しています。
S3バケットは、アカウント数やサービス数が増えてくると事前に定義することが困難なことも多くあると思います。
そうした際には、projection.bucket.type
のところでInjected型を利用することでクエリの実行時に挿入することが可能です。
ただし、クエリの実行時のWHERE句
で対象のパーティションの条件を指定しなければ実行エラーとなるので注意してください。
S3AccessLogsTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref "AWS::AccountId"
DatabaseName: [Glueデータベース名]
TableInput:
Name: s3_access_logs
TableType: EXTERNAL_TABLE
Parameters:
compressionType: gzip
typeOfData: file
serialization.format: 1
projection.enabled: true
projection.bucket.type: injected
storage.location.template: !Sub 's3://[バケット名]/AWSLogs/s3/${!bucket}'
StorageDescriptor:
Location: !Sub 's3://[バケット名]/AWSLogs/s3/'
Columns:
- Name: bucketowner
Type: string
- Name: bucket_name
Type: string
- Name: requestdatetime
Type: string
- Name: remoteip
Type: string
- Name: requester
Type: string
- Name: requestid
Type: string
- Name: operation
Type: string
- Name: key
Type: string
- Name: request_uri
Type: string
- Name: httpstatus
Type: string
- Name: errorcode
Type: string
- Name: bytessent
Type: bigint
- Name: objectsize
Type: bigint
- Name: totaltime
Type: string
- Name: turnaroundtime
Type: string
- Name: referrer
Type: string
- Name: useragent
Type: string
- Name: versionid
Type: string
- Name: hostid
Type: string
- Name: sigv
Type: string
- Name: ciphersuite
Type: string
- Name: authtype
Type: string
- Name: endpoint
Type: string
- Name: tlsversion
Type: string
SerdeInfo:
SerializationLibrary: org.apache.hadoop.hive.serde2.RegexSerDe
Parameters:
input.regex: "([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\"|-) ([^ ]*)(?: ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*))?.*$"
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
StoredAsSubDirectories: true
PartitionKeys:
- Name: bucket
Type: string
ちょっと困ったことが….
CloudFrontアクセスログやS3アクセスログは、日付要素
がログファイル名に入っており、Athenaのパーティション設定に必要な
ディレクトリの形式となっていないのでパーティション設定ができないようです。ログ量が増えたときは、
「すべてのログを走査してから検索するしかないのかな」と少し諦めかけていたのですが、どうやらAthenaには便利な機能があるようです。
SELECTクエリで "$path"
を使用することで対象のレコードデータが入っているファイル名を参照することができるようです。
関数と組み合わせたりWHERE句と組み合わせたりと、活用方法は色々ありそうですね。
Enjoy….
参照
- AWS:発見的統制(Detective controls)
- AWS:AWSのサービスログのクエリ
- AWS:Amazon Athena でのパーティション射影
- AWS:パーティション射影用にサポートされている型
- AWS:Amazon S3 内にあるソースデータのファイルの場所の取得