Skip to content

Commit

Permalink
Config option to silently ignore records with "none" disposition
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesJJ committed Dec 14, 2022
1 parent 635c0b5 commit 1f67ceb
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 61 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ Configuration is specified using command line flags or environment variables. Th
SQS slow poll timeout, 1-20 [DMARC_POLLTIMEOUT] (default 10)
-sqs string
Name of the SQS queue to poll [MANDATORY] [DMARC_SQS]
Name of the SQS queue to poll [MANDATORY] [DMARC_SQS]
-sqsprocessingtime int
SQS visibility timeout [DO NOT CHANGE] [DMARC_SQSPROCESSINGTIME] (default 3600)
-sqsregion string
AWS region of SQS queue [MANDATORY] [DMARC_SQSREGION]
-excludedispositionnone
Silently discard records where having a disposition value of "none" [DMARC_EXCLUDEDISPOSITIONNONE]
-verbose
Show detailed information during run [DMARC_VERBOSE]
```
Expand Down
2 changes: 1 addition & 1 deletion aws-serverless/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ GOOS=linux GOARCH=amd64 go build -o main
cd -

mv "${FUNC_DIR_RELATIVE}/main" ./
sam deploy -g --tags "tenant=fcg"
sam deploy --tags "project=dmarc_report" $@
rm -f ./main


Expand Down
2 changes: 1 addition & 1 deletion aws-serverless/samconfig.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ s3_prefix = "dmarc-report-parser"
region = "us-west-2"
confirm_changeset = true
capabilities = "CAPABILITY_IAM"
parameter_overrides = "DELETESQS=\"true\" EMPTYPOLLS=\"1\" MOVE=\"archive-dmarc\" VERBOSE=\"false\""
parameter_overrides = "EXCLUDEDISPOSITIONNONE=\"false\" DELETESQS=\"true\" EMPTYPOLLS=\"1\" MOVE=\"archive-dmarc\" VERBOSE=\"false\""
4 changes: 4 additions & 0 deletions aws-serverless/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ Parameters:
VERBOSE:
Type: String
Default: "false"
EXCLUDEDISPOSITIONNONE:
Type: String
Default: "false"
Resources:
SqsQueue:
Type: AWS::SQS::Queue
Expand Down Expand Up @@ -70,6 +73,7 @@ Resources:
DMARC_SQS: !GetAtt [ "SqsQueue", "QueueName" ]
DMARC_SQSREGION: !Ref AWS::Region
DMARC_VERBOSE: !Ref VERBOSE
DMARC_EXCLUDEDISPOSITIONNONE: !Ref EXCLUDEDISPOSITIONNONE
Events:
Schedule:
Type: Schedule
Expand Down
3 changes: 3 additions & 0 deletions extract-from-email.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ func DelimitedAppend(delimiter string, sb *strings.Builder, s string) {
func flattenDMARCRecord(msgToFirstAddress string, report *Feedback) *CsvRows {
var rows CsvRows
for rrIndex, rrRecord := range report.Records {
if *conf.excludeDispositionNone && strings.EqualFold(rrRecord.Row.Policy.Disposition, "none") {
continue
}
singleRow, err := generateDMARCRow(msgToFirstAddress, &report.Metadata, &report.Policy, &rrRecord)
if err != nil {
Error.Printf("flattenDMARCRecord Error: %v", err)
Expand Down
12 changes: 6 additions & 6 deletions extract-from-email_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,24 @@ func loadTestData(t *testing.T, filename string) *bytes.Reader {
func TestReadMailToRows(t *testing.T) {

fileTests := []FileTest{
FileTest{
{
File: "./test-data/mail-with-bad-gzip.txt",
ExpectedRows: 0,
Rows: CsvRows{},
}, FileTest{
}, {
File: "./test-data/mail-with-bad-zip.txt",
ExpectedRows: 0,
Rows: CsvRows{},
}, FileTest{
}, {
File: "./test-data/mail-with-non-dmarc-zip.txt",
ExpectedRows: 0,
Rows: CsvRows{},
}, FileTest{
}, {
File: "./test-data/mail-without-attachments.txt",
ExpectedRows: 0,
Rows: CsvRows{},
},
FileTest{
{
File: "./test-data/mail-with-zipped-dmarc-report.txt",
ExpectedRows: 1,
Rows: CsvRows{
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestReadMailToRows(t *testing.T) {
},
},
},
FileTest{
{
File: "./test-data/mail-with-gzip-dmarc-report.txt",
ExpectedRows: 1,
Rows: CsvRows{
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ module github.com/JamesJJ/dmarc-report-ses-tsv
go 1.15

require (
github.com/aws/aws-lambda-go v1.23.0 // indirect
github.com/aws/aws-sdk-go v1.37.25
github.com/aws/aws-lambda-go v1.36.0
github.com/aws/aws-sdk-go v1.44.159
github.com/jamesjj/podready v0.0.0-20191018164617-7f5557ad2b99
github.com/jamiealquiza/envy v1.1.0
github.com/jhillyerd/enmime v0.8.4
github.com/spf13/cobra v1.1.3 // indirect
github.com/urfave/cli v1.21.0 // indirect
)
54 changes: 38 additions & 16 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion graceful-stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func gracefulStop(additional func()) {

// Handle ^C and SIGTERM gracefully
var gracefulStop = make(chan os.Signal)
var gracefulStop = make(chan os.Signal, 1)
signal.Notify(gracefulStop, syscall.SIGTERM, syscall.SIGINT)
go func() {
sig := <-gracefulStop
Expand Down
2 changes: 0 additions & 2 deletions lambda-function.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package main

import ()

func handleRequest() {
run()
}
57 changes: 31 additions & 26 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,13 @@ type config struct {
moveFilesAfterProcessing *string
logVerbose *bool
sqsDelete *bool
excludeDispositionNone *bool
runDate *string
}

func init() {
rand.Seed(time.Now().UnixNano())
logInit(false)
}

func main() {
if os.Getenv("AWS_LAMBDA_FUNCTION_NAME") != "" {
runtime.Start(handleRequest)
} else {
run()
}
}

func run() {
runDate := time.Now().UTC().Format("20060102")

conf = config{
flag.String("sqs", "", "Name of the SQS queue to poll [MANDATORY]"),
flag.String("sqsregion", "", "AWS region of SQS queue [MANDATORY]"),
Expand All @@ -65,8 +53,25 @@ func run() {
flag.String("move", "", "Move email to this S3 prefix after processing. Date will be automatically added"),
flag.Bool("verbose", false, "Show detailed information during run"),
flag.Bool("deletesqs", true, "Delete messages from SQS after processing"),
&runDate,
flag.Bool("excludedispositionnone", false, "Exclude DMARC records with 'none' disposition"),
nil,
}

}

func main() {
if os.Getenv("AWS_LAMBDA_FUNCTION_NAME") != "" {
runtime.Start(handleRequest)
} else {
run()
}
}

func run() {

runDate := time.Now().UTC().Format("20060102")
conf.runDate = &runDate

envy.Parse("DMARC")
flag.Parse()

Expand Down Expand Up @@ -95,33 +100,33 @@ func run() {

if *conf.moveFilesAfterProcessing != "" {
wg.Add(1)
go func(conf config, wg *sync.WaitGroup, moveS3FileChan chan *S3EventRecord) {
go func(wg *sync.WaitGroup, moveS3FileChan chan *S3EventRecord) {
defer wg.Done()
for {
err := S3Move(conf, moveS3FileChan)
err := S3Move(moveS3FileChan)
if err == nil {
break
}
time.Sleep(3 * time.Second)
}
}(conf, &wg, moveS3FileChan)
}(&wg, moveS3FileChan)

}

wg.Add(1)
go func(conf config, wg *sync.WaitGroup, deleteSqsChan chan *string) {
go func(wg *sync.WaitGroup, deleteSqsChan chan *string) {
defer wg.Done()
for {
err := sqsDelete(conf, deleteSqsChan)
err := sqsDelete(deleteSqsChan)
if err == nil {
break
}
time.Sleep(3 * time.Second)
}
}(conf, &wg, deleteSqsChan)
}(&wg, deleteSqsChan)

wg.Add(1)
go func(conf config, wg *sync.WaitGroup) {
go func(wg *sync.WaitGroup) {
defer wg.Done()

for file := range uploadToS3Chan {
Expand All @@ -134,16 +139,16 @@ func run() {
)
returnToPool(file)
}
}(conf, &wg)
}(&wg)

wg.Add(1)
go func(conf config, wg *sync.WaitGroup) {
go func(wg *sync.WaitGroup) {
defer func() {
close(uploadToS3Chan)
wg.Done()
}()
WriteTSV(conf, writeTSVChan, uploadToS3Chan)
}(conf, &wg)
WriteTSV(writeTSVChan, uploadToS3Chan)
}(&wg)

gracefulStop(func() {})

Expand All @@ -152,7 +157,7 @@ func run() {

Debug.Printf("pollCount=%d", pollCount)

s3records, err := PollSQS(conf)
s3records, err := PollSQS()
if err != nil {
Error.Printf("Failed to poll SQS: %v", err)
pollCount--
Expand Down
2 changes: 1 addition & 1 deletion s3-move.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func S3RenameFile(svc *s3.S3, bucket *string, source *string, destination *strin
return nil
}

func S3Move(conf config, moveS3FileChan chan *S3EventRecord) error {
func S3Move(moveS3FileChan chan *S3EventRecord) error {

// This session is using conf for region, so is in report output bucket region
// Files being moved are in email input bucket region
Expand Down
7 changes: 5 additions & 2 deletions sqs-poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strings"
)

func sqsDelete(conf config, deleteSqsChan chan *string) error {
func sqsDelete(deleteSqsChan chan *string) error {

sess, err := session.NewSession(&aws.Config{
Region: aws.String(*conf.sqsRegion)},
Expand Down Expand Up @@ -44,11 +44,14 @@ func sqsDelete(conf config, deleteSqsChan chan *string) error {
return nil
}

func PollSQS(conf config) ([]*S3EventMsg, error) {
func PollSQS() ([]*S3EventMsg, error) {

sess, err := session.NewSession(&aws.Config{
Region: aws.String(*conf.sqsRegion)},
)
if err != nil {
return nil, fmt.Errorf("Unable to establish session: %v", err)
}

Debug.Printf("Polling SQS: %s, in %s", *conf.sqsName, *conf.sqsRegion)

Expand Down
2 changes: 1 addition & 1 deletion tsv-write.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func returnToPool(b *bytes.Buffer) {
bufPool.Put(b)
}

func WriteTSV(conf config, records <-chan *CsvRow, tsvDataOut chan<- *bytes.Buffer) {
func WriteTSV(records <-chan *CsvRow, tsvDataOut chan<- *bytes.Buffer) {

// TODO: Better actions if here finds errors

Expand Down

0 comments on commit 1f67ceb

Please sign in to comment.