AWSTemplateFormatVersion: '2010-09-09' Description: 'Template creates Athena Database and View for VPC Flow Logs' Parameters: AthenaQueryResultBucketArn: Type: String Description: The ARN of the Amazon S3 bucket to which Athena query results are stored. e.g. 'arn:aws:s3:::aws-athena-query-results-us-east-1-XXXXXXXXXXXXXX' Default: '' AthenaResultsOutputLocation: Type: String Description: URI path of the Amazon S3 bucket where Athena query results are stored. VpcFlowLogsBucketName: Type: String Description: Name of the Amazon S3 bucket where vpc flow logs are stored. e.g. my-flow-logs Default: '' VpcFlowLogsS3BucketLocation: Type: String Description: URI path of Amazon S3 bucket folder where VPC Flow logs files are stored e.g. Location='s3://my-vpc-flow-logs-bucket/vpc-flow-logs-enh-parquet/AWSLogs/' VpcFlowLogsFilePrefix: Description: (Optional) - The log file prefix in Amazon S3 bucket that comes right after s3 bucket name e.g. 'vpc-flow-logs-enh-parquet' Type: String Default: '' VpcFlowLogsAthenaDatabaseName: Type: String Description: (Optional) If you leave this blank this template will create an Athena database where external table for VPC flow logs is created. If you have already created a DB then external table will be created under existing DB. VpcFlowLogsAthenaTableName: Type: String Description: (Optional) If you leave this blank this template will create new Athena external table name for VPC flow logs otherwise it will use the existing table name you provided. HiveCompatibleS3prefix: Type: String Description: Adds prefixes of partition keys in s3 object key (Hive-compatible S3 prefix) AllowedValues: - true Default: true S3BucketRegion: Type: String Description: Region of the S3 bucket created in the central account. e.g. us-east-1 Conditions: VpcLogsAthenaDataBaseCondition: !Equals [!Ref VpcFlowLogsAthenaDatabaseName, ''] ExistingVpcLogsAthenaDataBaseCondition: !Not [!Equals [!Ref VpcFlowLogsAthenaDatabaseName, '']] VpcLogsAthenaTableCondition: !Equals [!Ref VpcFlowLogsAthenaTableName, ''] ExistingVpcLogsAthenaTableCondition: !Not [!Equals [!Ref VpcFlowLogsAthenaTableName, '']] Resources: # Creates a glue database for VPC Flow logs VpcFlowLogsAthenaDatabase: Condition: VpcLogsAthenaDataBaseCondition Type: AWS::Glue::Database Properties: DatabaseInput: Name: 'vpcflowlogsathenadatabase' CatalogId: !Ref AWS::AccountId # Create Athena external Table for VPC Flow Logs VpcFlowLogsAthenaTable: Condition: VpcLogsAthenaTableCondition Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !If [ExistingVpcLogsAthenaDataBaseCondition, !Ref VpcFlowLogsAthenaDatabaseName, !Sub '${VpcFlowLogsAthenaDatabase}'] TableInput: Description: This table has the schema for vpc flow logs information. Name: vpc_flow_logs_custom_integration PartitionKeys: - Name: aws-account-id Type: string - Name: aws-service Type: string - Name: aws-region Type: string - Name: year Type: string - Name: month Type: string - Name: day Type: string TableType: EXTERNAL_TABLE StorageDescriptor: Location: !Ref VpcFlowLogsS3BucketLocation InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat SerdeInfo: Parameters: skip.header.line.count: "1" EXTERNAL: "true" field.delim: ' ' serialization.format: ' ' SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe Columns: - Name: 'account_id' Type: string - Name: 'action' Type: string - Name: 'az_id' Type: string - Name: 'bytes' Type: bigint - Name: 'dstaddr' Type: string - Name: 'dstport' Type: int - Name: 'end' Type: bigint - Name: 'flow_direction' Type: string - Name: 'instance_id' Type: string - Name: 'interface_id' Type: string - Name: 'log_status' Type: string - Name: 'packets' Type: bigint - Name: 'pkt_dst_aws_service' Type: string - Name: 'pkt_dstaddr' Type: string - Name: 'pkt_src_aws_service' Type: string - Name: 'pkt_srcaddr' Type: string - Name: 'protocol' Type: bigint - Name: 'region' Type: string - Name: 'srcaddr' Type: string - Name: 'srcport' Type: int - Name: 'start' Type: bigint - Name: 'sublocation_id' Type: string - Name: 'sublocation_type' Type: string - Name: 'subnet_id' Type: string - Name: 'tcp_flags' Type: int - Name: 'traffic_path' Type: int - Name: 'type' Type: string - Name: 'version' Type: int - Name: 'vpc_id' Type: string # Creates an IAM role for lambda function execution permission VPCFlowLogsAthenaIntegrationLambdaExecutorRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Path: / Policies: - PolicyName: VPCFlowLogsAthenaIntegrationLambdaExecutorPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:ListBucket - logs:CreateLogGroup Resource: - !Sub '${AthenaQueryResultBucketArn}' - !Sub 'arn:aws:s3:::${VpcFlowLogsBucketName}' - !Sub 'arn:aws:logs:${S3BucketRegion}:${AWS::AccountId}:*' - Effect: Allow Action: - s3:PutObject - s3:GetBucketLocation - s3:GetObject - logs:CreateLogStream - logs:PutLogEvents Resource: - !Sub '${AthenaQueryResultBucketArn}' - !Sub '${AthenaQueryResultBucketArn}/*' - !Sub 'arn:aws:logs:${S3BucketRegion}:${AWS::AccountId}:log-group:/aws/lambda/vpc_logs_lambda_handler:*' - Effect: Allow Action: - athena:GetQueryResults - athena:StartQueryExecution - athena:CreateNamedQuery - athena:GetQueryExecution Resource: - !Sub 'arn:aws:athena:*:${AWS::AccountId}:workgroup/primary' - Effect: Allow Action: - glue:GetDatabase - glue:GetTable - glue:CreateTable - glue:UpdateTable - glue:BatchCreatePartition - glue:CreatePartition - glue:UpdatePartition - glue:GetPartition Resource: - !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:catalog' - !If [ExistingVpcLogsAthenaDataBaseCondition, !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:database/${VpcFlowLogsAthenaDatabaseName}', !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:database/${VpcFlowLogsAthenaDatabase}'] - !If [ExistingVpcLogsAthenaDataBaseCondition, !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:table/${VpcFlowLogsAthenaDatabaseName}/${VpcFlowLogsAthenaTableName}', !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:table/${VpcFlowLogsAthenaDatabase}/${VpcFlowLogsAthenaTable}'] - !If [ExistingVpcLogsAthenaDataBaseCondition, !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:table/${VpcFlowLogsAthenaDatabaseName}/vpc_flow_logs_summary_view', !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:table/${VpcFlowLogsAthenaDatabase}/vpc_flow_logs_summary_view'] - !If [ExistingVpcLogsAthenaDataBaseCondition, !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:table/${VpcFlowLogsAthenaDatabaseName}/vpc_flow_logs_daily_view', !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:table/${VpcFlowLogsAthenaDatabase}/vpc_flow_logs_daily_view'] - !If [ExistingVpcLogsAthenaDataBaseCondition, !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:table/${VpcFlowLogsAthenaDatabaseName}/vpc_flow_logs_enhanced_view', !Sub 'arn:aws:glue:${S3BucketRegion}:${AWS::AccountId}:table/${VpcFlowLogsAthenaDatabase}/vpc_flow_logs_enhanced_view'] - Effect: Allow Action: - s3:GetObjectAcl - s3:GetObject - s3:GetObjectTagging - s3:GetBucketPolicy Resource: - !Sub 'arn:aws:s3:::${VpcFlowLogsBucketName}' - !Sub 'arn:aws:s3:::${VpcFlowLogsBucketName}/*' Tags: - Key: Name Value: VPCFlowLogs-Lambda-Role - Key: Purpose Value: WALabVPCFlowLogs # Create Athena view for VPC Flow Logs # Creates a lambda fuction to add partitions to VPC Flow Logs external table VPCAthenaPartitionsFunction: Type: AWS::Lambda::Function Properties: Description: Adds partitions to Athena table for current day. Triggered by Cloudwatch scheduler with daily frequency. Handler: index.lambda_handler Runtime: python3.8 Role: !GetAtt 'VPCFlowLogsAthenaIntegrationLambdaExecutorRole.Arn' Timeout: 30 Code: ZipFile: | import boto3 import datetime import time import re import cfnresponse #S3 and Athena client s3 = boto3.client('s3') athena = boto3.client('athena') #Get Year, Month, Day for partition date = datetime.datetime.now() athena_year = str(date.year) athena_month = str(date.month).rjust(2, '0') athena_day = str(date.day).rjust(2, '0') #Parameters for S3 log location and Athena table (Fill this carefully) s3_buckcet_flow_log = '' # '' s3_account_prefix = 'vpc-flow-logs/AWSLogs/' # '' e.g. 'vpc-flow-logs' s3_ouput = '' # e.g. 's3://aws-athena-query-results-us-east-1-' database = '' table_name = '' # '' #Executing the athena query: def run_query(query, database, s3_output): try: query_response = athena.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': database }, ResultConfiguration={ 'OutputLocation': s3_output, } ) execution_id=query_response['QueryExecutionId'] state = 'RUNNING' while (state in ['RUNNING', 'QUEUED']): response = athena.get_query_execution(QueryExecutionId=execution_id) if 'QueryExecution' in response and 'Status' in response['QueryExecution'] and 'State' in \ response['QueryExecution']['Status']: state = response['QueryExecution']['Status']['State'] if state == 'FAILED': print(response) print("state == FAILED") print('Execution ID: ' + query_response['QueryExecutionId']) return False elif state == 'SUCCEEDED': s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation'] filename = re.findall('.*\/(.*)', s3_path)[0] return filename time.sleep(1) except Exception as e: print("Query Exception:- ", e) return query_response #Function to get the regions and run the query on the captured regions def lambda_handler(event, context): errs = None status = cfnresponse.SUCCESS account_id=None region=None database = event["ResourceProperties"]["dbName"] if event.get("ResourceProperties") != None else event["dbName"] table_name = event["ResourceProperties"]["VPCTableName"] if event.get("ResourceProperties") != None else event["VPCTableName"] frequency = event["ResourceProperties"]["frequency"] if event.get("ResourceProperties") != None else event["frequency"] s3_buckcet_flow_log = event["ResourceProperties"]["VpcFlowLogsBucketName"] if event.get("ResourceProperties") != None else event["VpcFlowLogsBucketName"] s3_ouput = event["ResourceProperties"]["s3Output"] if event.get("ResourceProperties") != None else event["s3Output"] s3_account_prefix=event["ResourceProperties"]["VpcFlowLogsFilePrefix"] + "AWSLogs/" if event.get("ResourceProperties") != None else event["VpcFlowLogsFilePrefix"] + "AWSLogs/" hive_compatible_s3_prefix = event["ResourceProperties"]["HiveCompatibleS3prefix"] if event.get("ResourceProperties") != None else event["HiveCompatibleS3prefix"] account_result = s3.list_objects(Bucket=s3_buckcet_flow_log,Prefix=s3_account_prefix, Delimiter='/') if event.get("RequestType") == 'Delete': status = cfnresponse.SUCCESS cfnresponse.send(event, context, status, errs, event["LogicalResourceId"]) else: try: for accounts in account_result.get('CommonPrefixes'): get_account=(accounts.get('Prefix','').replace(s3_account_prefix,'').replace('/','')) if get_account.find("=") >0: account_id=get_account.split("=")[1] s3_prefix = s3_account_prefix + get_account + '/aws-service=vpcflowlogs/' s3_input = 's3://' + s3_buckcet_flow_log + '/' + s3_prefix result = s3.list_objects(Bucket=s3_buckcet_flow_log,Prefix=s3_prefix, Delimiter='/') for regions in result.get('CommonPrefixes'): get_region=(regions.get('Prefix','').replace(s3_prefix,'').replace('/','')) if get_region.find("=") >0: region=get_region.split("=")[1] query = str("ALTER TABLE "+ database + "." + table_name +" ADD PARTITION (`aws-account-id`=\"" + account_id + "\", `aws-service`=\"vpcflowlogs\", `aws-region`=\"" + region + "\", year=\"" + athena_year + "\", month=\"" + athena_month + "\", day=\"" + athena_day + "\"") if frequency == "Hourly": query += ", hour=\"00\")" else: query += ")" query += " location '" + s3_input + get_region + "/year=" + athena_year + "/month=" + athena_month + "/day=" + athena_day if frequency == "Hourly": query += "/hour=00';" else: query += "';" ## Enable below prints for debugging ## print("*" * 10, "START", "*" * 10) # -- for debug print(get_region) # -- for debug print(query) # -- for debug print("*" * 10, "START", "*" * 10) # -- for debug print(database) # -- for debug print(s3_ouput) # -- for debug query_result=run_query(query, database, s3_ouput) print(query_result) # -- for debug print("*" * 10, "END", "*" * 10) # -- for debug status = cfnresponse.SUCCESS except Exception as e: print("lambda_handler Exception:- ", e) errs = e status = cfnresponse.FAILED finally: if event.get("RequestType") != None: cfnresponse.send(event, context, status, {}, event.get("LogicalResourceId")) Tags: - Key: Name Value: VPCFlowLogs-Lambda-Function - Key: Purpose Value: WALabVPCFlowLogs # Cloudwatch event rule that will be triggered daily for adding partitions to vpc flow logs Athena table DailyAddPartitionLambdaEventsRule: Type: AWS::Events::Rule Properties: Description: Event rule to trigger lambda function that adds partition to athena table for vpc flow logs. Name: daily-add-vpc-logs-partitions ScheduleExpression: "rate(1 day)" State: ENABLED Targets: - Arn: !Sub ${VPCAthenaPartitionsFunction.Arn} Id: DailyAddPartitionLambdaEventsRule Input: !If [ExistingVpcLogsAthenaDataBaseCondition, !Sub '{"frequency": "Daily", "dbName": "${VpcFlowLogsAthenaDatabaseName}", "VPCTableName": "${VpcFlowLogsAthenaTableName}", "s3Output": "${AthenaResultsOutputLocation}", "VpcFlowLogsBucketName": "${VpcFlowLogsBucketName}", "VpcFlowLogsFilePrefix": "${VpcFlowLogsFilePrefix}", "HiveCompatibleS3prefix": "${HiveCompatibleS3prefix}"}', !Sub '{"frequency": "Daily", "dbName": "${VpcFlowLogsAthenaDatabase}", "VPCTableName": "${VpcFlowLogsAthenaTable}", "s3Output": "${AthenaResultsOutputLocation}", "VpcFlowLogsBucketName": "${VpcFlowLogsBucketName}", "VpcFlowLogsFilePrefix": "${VpcFlowLogsFilePrefix}", "HiveCompatibleS3prefix": "${HiveCompatibleS3prefix}"}'] # Creates permission for cloudwatch events to execute lambda function PermissionForEventsToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !Sub ${VPCAthenaPartitionsFunction.Arn} Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "DailyAddPartitionLambdaEventsRule" - "Arn" # Creates an initializer trigger Athena partition lambda function invokation CreatePartitionInitializer: Type: 'Custom::VPCFlowLogsAthenaPartitionInitializer' DependsOn: - VPCAthenaPartitionsFunction Properties: ServiceToken: !GetAtt VPCAthenaPartitionsFunction.Arn dbName: !If [ExistingVpcLogsAthenaDataBaseCondition, !Ref VpcFlowLogsAthenaDatabaseName, !Sub '${VpcFlowLogsAthenaDatabase}'] VPCTableName: !If [ExistingVpcLogsAthenaTableCondition, !Ref VpcFlowLogsAthenaTableName, !Ref VpcFlowLogsAthenaTable] service: vpcflowlogs frequency: "Daily" s3Output: !Ref AthenaResultsOutputLocation VpcFlowLogsBucketName: !Ref VpcFlowLogsBucketName VpcFlowLogsFilePrefix: !Ref VpcFlowLogsFilePrefix HiveCompatibleS3prefix: !Ref HiveCompatibleS3prefix # Creates Athena View for VPC Flow Logs VPCAthenaViewFunction: Type: AWS::Lambda::Function Properties: Description: Creates Athena view for VPC FLow Logs external table Handler: index.lambda_handler Runtime: python3.8 Role: !GetAtt 'VPCFlowLogsAthenaIntegrationLambdaExecutorRole.Arn' Timeout: 60 Code: ZipFile: | import boto3 import datetime import time import re import cfnresponse # S3 and Athena client s3 = boto3.client('s3') athena = boto3.client('athena') #Executing the athena query: def run_query(query, database, s3_output): try: query_response = athena.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': database }, ResultConfiguration={ 'OutputLocation': s3_output, } ) execution_id=query_response['QueryExecutionId'] state = 'RUNNING' while (state in ['RUNNING', 'QUEUED']): response = athena.get_query_execution(QueryExecutionId=execution_id) if 'QueryExecution' in response and 'Status' in response['QueryExecution'] and 'State' in \ response['QueryExecution']['Status']: state = response['QueryExecution']['Status']['State'] if state == 'FAILED': print(response) print("state == FAILED") print('Execution ID: ' + query_response['QueryExecutionId']) return False elif state == 'SUCCEEDED': s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation'] filename = re.findall('.*\/(.*)', s3_path)[0] return filename time.sleep(1) except Exception as e: print("Query Exception:- ", e) return query_response #Function to get the regions and run the query on the captured regions def lambda_handler(event, context): errs = None status = cfnresponse.SUCCESS database = event["ResourceProperties"]["athenaIntegrations"][0]["database"] s3_output = event["ResourceProperties"]["athenaIntegrations"][0]["s3_output"] vpc_table_name = event["ResourceProperties"]["athenaIntegrations"][0]["vpc_table_name"] if event.get("RequestType") == 'Delete': status = cfnresponse.SUCCESS cfnresponse.send(event, context, status, errs, event["LogicalResourceId"]) else: try: ## Enable below prints for debugging ## # print("*" * 10, "START", "*" * 10) # -- for debug summary_query = summary_view_query(vpc_table_name) daily_query = daily_view_query(vpc_table_name) enh_query = enhanced_view_query(vpc_table_name) print(summary_query) # -- for debug print(daily_query) # -- for debug print(enh_query) # -- for debug #print(database) # -- for debug #print(s3_output) # -- for debug query_result=run_query(summary_query, database, s3_output) # print(query_result) # -- for debug query_result=run_query(daily_query, database, s3_output) # print(query_result) # -- for debug query_result=run_query(enh_query, database, s3_output) # print(query_result) # -- for debug # print("*" * 10, "END", "*" * 10) # -- for debug status = cfnresponse.SUCCESS except Exception as e: print("Query Exception:- ", e) errs = e status = cfnresponse.FAILED finally: status = cfnresponse.SUCCESS if event.get("RequestType") != None: cfnresponse.send(event, context, status, {}, event.get("LogicalResourceId")) def summary_view_query(vpc_table_name): query = str("CREATE OR REPLACE VIEW vpc_flow_logs_summary_view AS SELECT" + " \"account_id\" \"accountid\" " + ", \"interface_id\" \"interfaceid\" " + ", \"srcaddr\" \"sourceaddress\" " + ", \"dstaddr\" \"destinationaddress\" " + ", \"srcport\" \"sourceport\" " + ", \"dstport\" \"destinationport\" " + ", \"protocol\" \"protocol\" " + ", \"sum\"(\"packets\") \"numpackets\" " + ", \"sum\"(\"bytes\") \"numbytes\" " + ", \"action\" \"action\" " + ", \"count\"(\"action\") \"action_count\" " + ", \"log_status\" \"logstatus\" " + ", \"count\"(\"log_status\") \"log_status_count\" " + ", \"vpc_id\" \"vpcid\" " + ", \"az_id\" \"azid\" " + ", \"pkt_srcaddr\" \"packetsourceaddr\" " + ", \"pkt_dstaddr\" \"packetdestinationaddr\" " + ", \"region\" \"region\" " + ", \"subnet_id\" \"subnetid\" " + ", (CASE WHEN (CAST(\"tcp_flags\" AS varchar) = '1') THEN 'FIN' WHEN (CAST(\"tcp_flags\" AS varchar) = '2') THEN 'SYN' WHEN (CAST(\"tcp_flags\" AS varchar) = '3') THEN 'SYN-FIN' WHEN (CAST(\"tcp_flags\" AS varchar) = '4') THEN 'RST' WHEN (CAST(\"tcp_flags\" AS varchar) = '8') THEN 'PSH' WHEN (CAST(\"tcp_flags\" AS varchar) = '16') THEN 'ACK' WHEN (CAST(\"tcp_flags\" AS varchar) = '18') THEN 'SYN-ACK' WHEN (CAST(\"tcp_flags\" AS varchar) = '19') THEN 'SYN-ACK-FIN' WHEN (CAST(\"tcp_flags\" AS varchar) = '32') THEN 'URG' ELSE CAST(\"tcp_flags\" AS varchar) END) \"tcpflags\" " + ", \"tcp_flags\" \"tcp_flags\" " + ", \"count\"(\"tcp_flags\") \"tcp_flags_count\" " + ", \"flow_direction\" \"flowdirection\" " + ", \"pkt_src_aws_service\" \"packetsrcawsservice\" " + ", \"pkt_dst_aws_service\" \"packetdstawsservice\" " + ", (CASE WHEN (CAST(\"traffic_path\" AS varchar) = '1') THEN 'ResourceInSameVPC' WHEN (CAST(\"traffic_path\" AS varchar) = '2') THEN 'IGW-OR-GatewayVPCEndpoint' WHEN (CAST(\"traffic_path\" AS varchar) = '3') THEN 'VirtualPrivateGateway' WHEN (CAST(\"traffic_path\" AS varchar) = '4') THEN 'Intra-RegionVPCPeering' WHEN (CAST(\"traffic_path\" AS varchar) = '5') THEN 'Inter-RegionVPCPeering' WHEN (CAST(\"traffic_path\" AS varchar) = '6') THEN 'LocalGateway' WHEN (CAST(\"traffic_path\" AS varchar) = '7') THEN 'GatewayVPCEndpoint' WHEN (CAST(\"traffic_path\" AS varchar) = '8') THEN 'InternetGateway' ELSE CAST(\"traffic_path\" AS varchar) END) \"trafficpath\" " + ", \"traffic_path\" \"traffic_path\" " + ", \"type\" \"iptype\" " + "FROM " + vpc_table_name + " WHERE (((year = format_datetime(current_timestamp, 'YYYY')) AND (month = format_datetime(current_timestamp, 'MM'))) OR ((year = format_datetime((date_trunc('month', current_timestamp) - INTERVAL '1' MONTH), 'YYYY')) AND (month = format_datetime((date_trunc('month', current_timestamp) - INTERVAL '1' MONTH), 'MM')))) " + "GROUP BY account_id, interface_id, srcaddr, dstaddr, srcport, dstport, protocol, action, log_status, vpc_id, az_id, pkt_srcaddr, pkt_dstaddr, region, subnet_id, tcp_flags, flow_direction, pkt_src_aws_service, pkt_dst_aws_service, traffic_path, type;") return query def daily_view_query(vpc_table_name): query = str("CREATE OR REPLACE VIEW vpc_flow_logs_daily_view AS " + " SELECT " + " \"account_id\" \"accountid\" " + " , \"interface_id\" \"interfaceid\" " + " , \"srcaddr\" \"sourceaddress\" " + " , \"dstaddr\" \"destinationaddress\" " + " , \"srcport\" \"sourceport\" " + " , \"dstport\" \"destinationport\" " + " , \"protocol\" \"protocol\" " + " , \"sum\"(\"packets\") \"numpackets\" " + " , \"sum\"(\"bytes\") \"numbytes\" " + " , \"start\" \"starttime\" " + " , \"end\" \"endtime\" " + " , \"action\" \"action\" " + " , \"count\"(\"action\") \"action_count\" " + " , \"log_status\" \"logstatus\" " + " , \"vpc_id\" \"vpcid\" " + " , \"az_id\" \"azid\" " + " , \"pkt_srcaddr\" \"packetsourceaddr\" " + " , \"pkt_dstaddr\" \"packetdestinationaddr\" " + " , \"region\" \"region\" " + " , \"subnet_id\" \"subnetid\" " + " , (CASE WHEN (CAST(\"tcp_flags\" AS varchar) = '1') THEN 'FIN' WHEN (CAST(\"tcp_flags\" AS varchar) = '2') THEN 'SYN' WHEN (CAST(\"tcp_flags\" AS varchar) = '3') THEN 'SYN-FIN' WHEN (CAST(\"tcp_flags\" AS varchar) = '4') THEN 'RST' WHEN (CAST(\"tcp_flags\" AS varchar) = '8') THEN 'PSH' WHEN (CAST(\"tcp_flags\" AS varchar) = '16') THEN 'ACK' WHEN (CAST(\"tcp_flags\" AS varchar) = '18') THEN 'SYN-ACK' WHEN (CAST(\"tcp_flags\" AS varchar) = '19') THEN 'SYN-ACK-FIN' WHEN (CAST(\"tcp_flags\" AS varchar) = '32') THEN 'URG' ELSE CAST(\"tcp_flags\" AS varchar) END) \"tcpflags\" " + " , \"tcp_flags\" \"tcp_flags\" " + " , \"flow_direction\" \"flowdirection\" " + " , (CASE WHEN (CAST(\"traffic_path\" AS varchar) = '1') THEN 'ResourceInSameVPC' WHEN (CAST(\"traffic_path\" AS varchar) = '2') THEN 'IGW-OR-GatewayVPCEndpoint' WHEN (CAST(\"traffic_path\" AS varchar) = '3') THEN 'VirtualPrivateGateway' WHEN (CAST(\"traffic_path\" AS varchar) = '4') THEN 'Intra-RegionVPCPeering' WHEN (CAST(\"traffic_path\" AS varchar) = '5') THEN 'Inter-RegionVPCPeering' WHEN (CAST(\"traffic_path\" AS varchar) = '6') THEN 'LocalGateway' WHEN (CAST(\"traffic_path\" AS varchar) = '7') THEN 'GatewayVPCEndpoint' WHEN (CAST(\"traffic_path\" AS varchar) = '8') THEN 'InternetGateway' ELSE CAST(\"traffic_path\" AS varchar) END) \"trafficpath\" " + " , \"traffic_path\" \"traffic_path\" " + " , \"type\" \"iptype\" " + " , CAST(\"date_format\"(\"from_unixtime\"(\"start\"), '%d') AS integer) \"startday\" " + " , CAST(\"date_format\"(\"from_unixtime\"(\"start\"), '%m') AS integer) \"startmonth\" " + " , CAST(\"date_format\"(\"from_unixtime\"(\"end\"), '%d') AS integer) \"endday\" " + " , CAST(\"date_format\"(\"from_unixtime\"(\"end\"), '%m') AS integer) \"endmonth\" " + " FROM " + vpc_table_name + " WHERE (((year = format_datetime(current_timestamp, 'YYYY')) AND (month = format_datetime(current_timestamp, 'MM'))) OR ((year = format_datetime((date_trunc('month', current_timestamp) - INTERVAL '1' MONTH), 'YYYY')) AND (month = format_datetime((date_trunc('month', current_timestamp) - INTERVAL '1' MONTH), 'MM')))) " + "GROUP BY \"account_id\", \"interface_id\", \"srcaddr\", \"dstaddr\", \"srcport\", \"dstport\", \"protocol\", \"start\", \"end\", \"action\", \"log_status\", \"vpc_id\", \"az_id\", \"pkt_srcaddr\", \"pkt_dstaddr\", \"region\", \"subnet_id\", \"tcp_flags\", \"flow_direction\", \"traffic_path\", \"type\" ORDER BY startmonth ASC, startday ASC;") return query def enhanced_view_query(vpc_table_name): query =str("CREATE OR REPLACE VIEW vpc_flow_logs_enhanced_view AS " + " SELECT " + " \"account_id\" \"accountid\" " + " , \"srcaddr\" \"sourceaddress\" " + " , \"dstaddr\" \"destinationaddress\" " + " , \"sum\"(\"packets\") \"numpackets\" " + " , \"sum\"(\"bytes\") \"numbytes\" " + " , \"vpc_id\" \"vpcid\" " + " , \"az_id\" \"azid\" " + " , \"pkt_srcaddr\" \"packetsourceaddr\" " + " , \"pkt_dstaddr\" \"packetdestinationaddr\" " + " , \"region\" \"region\" " + " , \"subnet_id\" \"subnetid\" " + " , \"flow_direction\" \"flowdirection\" " + " , \"pkt_src_aws_service\" \"packetsrcawsservice\" " + " , \"pkt_dst_aws_service\" \"packetdstawsservice\" " + " , (CASE WHEN (CAST(\"traffic_path\" AS varchar) = '1') THEN 'ResourceInSameVPC' WHEN (CAST(\"traffic_path\" AS varchar) = '2') THEN 'IGW-OR-GatewayVPCEndpoint' WHEN (CAST(\"traffic_path\" AS varchar) = '3') THEN 'VirtualPrivateGateway' WHEN (CAST(\"traffic_path\" AS varchar) = '4') THEN 'Intra-RegionVPCPeering' WHEN (CAST(\"traffic_path\" AS varchar) = '5') THEN 'Inter-RegionVPCPeering' WHEN (CAST(\"traffic_path\" AS varchar) = '6') THEN 'LocalGateway' WHEN (CAST(\"traffic_path\" AS varchar) = '7') THEN 'GatewayVPCEndpoint' WHEN (CAST(\"traffic_path\" AS varchar) = '8') THEN 'InternetGateway' ELSE CAST(\"traffic_path\" AS varchar) END) \"trafficpath\" " + " , \"traffic_path\" \"traffic_path\" " + " FROM " + vpc_table_name + " WHERE (((((year = format_datetime(current_timestamp, 'YYYY')) AND (month = format_datetime(current_timestamp, 'MM'))) OR ((year = format_datetime((date_trunc('month', current_timestamp) - INTERVAL '1' MONTH), 'YYYY')) AND (month = format_datetime((date_trunc('month', current_timestamp) - INTERVAL '1' MONTH), 'MM')))) AND (((pkt_src_aws_service <> '') OR (pkt_dst_aws_service <> '')) OR (traffic_path >= 0))) AND ((pkt_src_aws_service <> '-') OR (pkt_dst_aws_service <> '-'))) " + " GROUP BY \"account_id\", \"srcaddr\", \"dstaddr\", \"dstport\", \"protocol\", \"vpc_id\", \"az_id\", \"pkt_srcaddr\", \"pkt_dstaddr\", \"region\", \"subnet_id\", \"flow_direction\", \"pkt_src_aws_service\", \"pkt_dst_aws_service\", \"traffic_path\";") return query Tags: - Key: Name Value: VPCFlowLogs-Lambda-Function - Key: Purpose Value: WALabVPCFlowLogs # Creates an initializer trigger to Athena View lambda function invokation CreateAthenaViewInitializer: Type: 'Custom::VPCFlowLogsAthenaViewStartInitializer' DependsOn: - VPCAthenaViewFunction Properties: ServiceToken: !GetAtt VPCAthenaViewFunction.Arn dbName: !If [ExistingVpcLogsAthenaDataBaseCondition, !Ref VpcFlowLogsAthenaDatabaseName, !Sub '${VpcFlowLogsAthenaDatabase}'] service: vpcflowlogs athenaIntegrations: - s3_output: !Ref AthenaResultsOutputLocation database: !If [ExistingVpcLogsAthenaDataBaseCondition, !Ref VpcFlowLogsAthenaDatabaseName, !Sub '${VpcFlowLogsAthenaDatabase}'] vpc_table_name: !If [ExistingVpcLogsAthenaTableCondition, !Ref VpcFlowLogsAthenaTableName, !Ref VpcFlowLogsAthenaTable] Outputs: StackName: Description: 'Stack name' Value: !Sub '${AWS::StackName}'