Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up RITA parser #654

Merged
merged 24 commits into from
Aug 20, 2021
Merged

Speed up RITA parser #654

merged 24 commits into from
Aug 20, 2021

Conversation

Zalgo2462
Copy link
Contributor

@Zalgo2462 Zalgo2462 commented Jul 7, 2021

Goal

This PR speeds up the RITA parser for both JSON and TSV. In one benchmark parsing large amounts JSON files, this PR executes in a quarter of time needed by the master branch.

Note to the reviewer

I would recommend reviewing it by looking at the individual commits, as there have been a large number of changes. If this is overwhelming, we can split up the PR into sections by splitting off the individual commits into their own PRs.

Changes

  • In activecm/rita@bfed5a9, I refactored the dependencies for the FSImporter. This was important because any profiling done on the FSImporter simply reported that parseFiles was taking a long time. parseFiles contained the parsing logic for each type of log file. In order to break up this method, I needed to simplify the method's dependencies.

  • In activecm/rita@05627e5, the logic for parsing each type of file has been moved to its own file. Additionally, breaking up this logic allowed me to break up the mutex that was used to make writing out the parsing results threadsafe. In this revision, each map within the parse results has its own mutex, allowing for greater parallelism.

  • activecm/rita@741a9cd adds in a printed message which tells the user how long it took to read in a batch of logs. Additionally, this commit embeds profiling logic as comments into the code to make it easier to enable cpu and ram profiling. We can revert this commit if need be.

  • In activecm/rita@ae7cf37, I replaced calls to strings.Split with calls to strings.Index and slice operations in order to reduce string allocations in the TSV parsing code. Since this section of code was called for each line we parsed, the targeted allocations created a good deal of garbage that the GC had to constantly take care of.

  • In activecm/rita@ddef24b, strconv.ParseInt was replaced with strconv.Atoi because the Go library has added a performance hack for small numbers in one method but not the other. The ParseInt method was identified as a major time sink in the TSV parser via go tool pprof.

  • In activecm/rita@a0cfecd, the map which tells the TSV parser which struct field each sequential token maps to was replaced with an array. The profiler indicated that the TSV parser spent the majority of its time hashing field names. Since the names of the fields were always accessed sequentially, it was possible to map from the sequential index directly to the target struct fields.

  • In activecm/rita@0e911ee and activecm/rita@dbf8ce9, the Golang gzip library was replaced with calls to the system pigz or gzip commands. The profiler indicated that much of the time in the parse loop was spent decompressing files. After a google search, I found that Docker has run into similar issues: Gzip performance moby/moby#10181. I then implemented a fix similar to theirs. This fix saved a great deal of time for both the TSV and JSON parsers.

  • In activecm/rita@0616f3e, I replaced the standard JSON parser with a faster library. The profiler indicated that much of the time spent by the JSON parser was deep in the Golang standard library. After a few searches, I found that Prometheus, Docker, and Grafana as well as many other well-known Go projects used the package json-iter to speed up their JSON marshalling. It was a drop in replacement and yielded a massive speedup.

  • In activecm/rita@cd8dd38 and activecm/rita@180de84, I converted the UniqueIPSet implementation over from using a slice to a hashmap. The profiler indicated that the parsing code spent a great deal of time running linear searches against the UniqueIPSets when processing hostnames. Switching the implementation over to a hashmap sped up the process considerably at the expense of around 4x the RAM. However, RAM usage has not grown considerably overall.

  • In activecm/rita@1cf8e14, I changed the batching limit to take the greater of the old default (4GB) or half of the system's total memory. This allows us to run the MongoDB analysis phase quicker when the system can support it.

  • In activecm/rita@c4b5b3f and activecm/rita@12215e0, I replaced array based string sets which used calls to StringInSlice with map based string sets similar to the UniqueIP sets found in activecm/rita@cd8dd38

  • In activecm/rita@43a7dc7, I added a bit of code in util.ContainsIP and util.IPIsPubliclyRoutable to cache the results of net.IP.To4() which is called by many of the net api methods. It turns out that when you parse an IPv4 address in Go, the IP is stored using 16 bytes, but then the net api slices out the 4 bytes it needs via To4() in all the api methods.

  • In activecm/rita@4f78c1e, I reordered the checks in data.UniqueIP to prevent parsing empty UUIDs. In previous versions, we'd call uuid.Parse("") at least twice for every log entry. Each of these calls would result in allocating an error object on the heap that we'd immediately throw away, creating a good deal of work for the garbage collector.

  • In activecm/rita@3028e62, I fixed up the SSL parser to increment the hostMap source/ destination counters to address issue host collection connection counts are undercounted #684.

Performance Testing

I've tested the changes on a large JSON dataset as well as several small TSV datasets.
The JSON dataset consisted of 3414 gzipped Zeek logs of which 2063 were conn, http, ssl, and dns logs. The 2063 logs that needed to be processed by RITA totaled 70.385 GB.

While testing for performance, I used Go's pprof tool to find the sections of code which took up the majority of the execution time.
I commented out lines in fsimporter.go which builds the MongoDB collections from the parse results beginning with

// Set chunk before we continue so if process dies, we still verify with a delete if
// any data was written out.
fs.metaDB.SetChunk(fs.config.S.Rolling.CurrentChunk, fs.database.GetSelectedDB(), true)

down to

// record file+database name hash in metadabase to prevent duplicate content
fmt.Println("\t[-] Indexing log entries ... ")
err := fs.metaDB.AddNewFilesToIndex(indexedFileBatch)
if err != nil {
	fs.log.Error("Could not update the list of parsed files")
}

I added a line to print out how long it took to import each batch of files to the original master branch and ran it against the large JSON dataset on a DO droplet with (virtualized) SSD storage, 16GB of RAM and 8 CPU cores. The results were as follows:

Master branch
        [-] Finished parsing logs in 5m28.344s
        [-] Finished parsing logs in 8m5.327s
        [-] Finished parsing logs in 10m8.033s
	[-] Finished parsing logs in 15m42.579s
        [-] Finished parsing logs in 20m5.076s
        [-] Finished parsing logs in 19m41.801s
        [-] Finished parsing logs in 19m4.292s
        [-] Finished parsing logs in 15m17.52s
        [-] Finished parsing logs in 15m12.626s
        [-] Finished parsing logs in 15m22.514s
        [-] Finished parsing logs in 15m52.611s
        [-] Finished parsing logs in 12m2.84s
        [-] Finished parsing logs in 7m47.568s
        [-] Finished parsing logs in 9m22.946s
        [-] Finished parsing logs in 7m12.611s
        [-] Finished parsing logs in 6m58.84s
        [-] Finished parsing logs in 9m58.732s
        [-] Finished parsing logs in 7m56.549s
	TOTAL: 3h41m20.809s

customer-cpu-parse-master-pprof

Compare this to the results after the PR changes:

        [-] Finished parsing logs in 3m7.787s
        [-] Finished parsing logs in 6m45.158s
        [-] Finished parsing logs in 6m5.804s
        [-] Finished parsing logs in 6m16.351s
        [-] Finished parsing logs in 6m14.18s
        [-] Finished parsing logs in 5m57.668s
        [-] Finished parsing logs in 6m30.06s
        [-] Finished parsing logs in 5m37.612s
        [-] Finished parsing logs in 5m23.952s
	TOTAL 0h:51m58.572s

customer-cpu-parse-4f78c1e-pprof

I've attached high resolution SVGs of the images above as well as the raw pprof data which can be opened via go tool pprof.
pprof_data.zip

Logan L added 8 commits July 15, 2021 13:02
…with open connections where the host map's counters for unexpected protocol port service tuples weren't being incremented
…ted extra strings, function runs ~1.2x faster now, and GC is doing a small bit better
…eld offsets using an array. We previously mapped from each Zeek field's name to the offsets using a hashmap. This took a lot of time since the code was executed a lot.
…r specifically uses pigz for this purpose.
Logan L added 4 commits July 15, 2021 18:10
… either 4GB (as before) or half of system RAM. Note that RAM usage is much lower than the batch limit since we don't store every line we read.
@Zalgo2462 Zalgo2462 marked this pull request as ready for review July 17, 2021 01:39
@Zalgo2462
Copy link
Contributor Author

The latest master merge commit activecm/rita@93d094a implements #665

Copy link
Contributor Author

@Zalgo2462 Zalgo2462 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments noting where the different commits pop up in the PR.

@@ -232,12 +232,12 @@ func (i *Importer) run() error {
}
i.res.Config.S.Rolling = rollingCfg

importer := parser.NewFSImporter(i.res, i.threads, i.threads, i.importFiles)
importer := parser.NewFSImporter(i.res)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FSImporter was refactored such that the threads and file set are passed in in importer.CollectFileDetails and importer.Run

activecm/rita@bfed5a9

@@ -259,7 +259,20 @@ func (i *Importer) run() error {
fmt.Printf("\t[+] Non-rolling database %v will be converted to rolling\n", i.targetDatabase)
}

importer.Run(indexedFiles)
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be removed if desired.

activecm/rita@741a9cd

@@ -288,7 +301,7 @@ func (i *Importer) handleDeleteOldData() error {

// Remove the analysis results for the chunk
targetChunk := i.res.Config.S.Rolling.CurrentChunk
removerRepo := remover.NewMongoRemover(i.res)
removerRepo := remover.NewMongoRemover(i.res.DB, i.res.Config, i.res.Log)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the packages under pkg have been refactored to take in the DB, Config, and Log structures separately in their constructors.

activecm/rita@bfed5a9

)

// https://gist.github.com/harshavardhana/327e0577c4fed9211f65#gistcomment-2557682
func duration(d time.Duration) string {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been moved into the util package. I have added a line which prints out how long it took to read in the files from the filesystem and I wanted to reuse the code here for formatting.

activecm/rita@741a9cd

@@ -6,7 +6,7 @@ import (
"time"

"github.com/activecm/rita/config"
fpt "github.com/activecm/rita/parser/fileparsetypes"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved around some of the files in the parser/ directory in order to separate logic dependent on the file system from the logic involved in aggregating log entries.

In this case IndexedFile has moved from parser/indexedfile.go to parser/files/indexing.go.

activecm/rita@bfed5a9

@@ -60,21 +60,23 @@ func (a *analyzer) start() {
// set up writer output
var output update

if len(datum.OrigIps) > 10 {
datum.OrigIps = datum.OrigIps[:10]
origIPs := datum.OrigIps.Items()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that .Items() must be called to convert the map into an array.

activecm/rita@cd8dd38
activecm/rita@180de84

indexingThreads int, parseThreads int, importFiles []string) *FSImporter {
func NewFSImporter(res *resources.Resources) *FSImporter {
// set batchSize to the max of 4GB or a half of system RAM to prevent running out of memory while importing
batchSize := int64(util.MaxUint64(4*(1<<30), (memory.TotalMemory() / 2)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 1cf8e14, I changed the batching limit to take the greater of the old default (4GB) or half of the system's total memory. This allows us to run the MongoDB analysis phase quicker when the system can support it.

@@ -54,6 +54,11 @@ func ParseSubnets(subnets []string) (parsedSubnets []*net.IPNet) {

//IPIsPubliclyRoutable checks if an IP address is publicly routable. See privateIPBlocks.
func IPIsPubliclyRoutable(ip net.IP) bool {
// cache IPv4 conversion so it not performed every in every ip.IsXXX method
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 43a7dc7, I added a bit of code in util.ContainsIP and util.IPIsPubliclyRoutable to cache the results of net.IP.To4() which is called by many of the net api methods. It turns out that when you parse an IPv4 address in Go, the IP is stored using 16 bytes, but then the net api slices out the 4 bytes it needs via To4() in all the api methods.

return
}

func updateHostsBySSL(srcIP, dstIP net.IP, srcUniqIP, dstUniqIP data.UniqueIP, srcKey, dstKey string,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes issue #684.

@@ -88,3 +99,40 @@ func StringInSlice(value string, list []string) bool {
}
return false
}

//Int64InSlice returns true if the int64 is an element of the array
func Int64InSlice(value int64, list []int64) bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed the int64 linear searches did not take up too much time (mainly used when unique'ing unique connection timestamps). My guess is that the go compiler is smart enough to optimize the linear scan using SIMD or that the CPU is smart enough to speculatively execute checks in parallel. (Moved from fsimporter)

@Zalgo2462
Copy link
Contributor Author

Merged master back into this branch.
Integrated #690 and double checked that the rolling proxy beacons were still being reported correctly.

Copy link
Contributor

@fullmetalcache fullmetalcache left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a first pass, everything looks good to me. Looks like the majority of changes were splitting out the parsers into different files, changing variable types/functions called, other optimization tricks, and some renaming/reorganizing of other variable types to improve readability. Just one minor question on the renaming of bro to zeek. Not a big deal though.

I will test this out and give it a second pass tomorrow. If everything else checks out, I am good with approving this.

Nice work man, so many good changes and learned so much here! Incredible how some of those variable types and function choices can make such a huge difference. If we don't already have something, we should consider having a doc or something else with some of these efficiency findings for types and functions so that we can refer to them in the future.

}

func mapZeekHeaderToParseType(header *BroHeader, broDataFactory func() pt.BroData, logger *log.Logger) (ZeekHeaderIndexMap, error) {
broData := broDataFactory()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of a tough one as it goes along with the comment above but would we want to rename this to zeekData? Either way, at some point I think we should change all bro references to zeek for consistency. I will only mention the naming once here as to not clutter the comments with comments on renaming bro to zeek. If we decide to do that, we can just do some find-and-replacing rather than me comment on every instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this one as broData since the type was kept as pt.BroData from previous versions. I've added an issue to the tracker to generally rename the references from Bro to Zeek (#693).

res *resources.Resources
min int64
max int64
database *database.DB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like how this removes a level from a lot of the places where we use this struct. Makes things a bit cleaner

@@ -202,23 +209,24 @@ func (p UniqueIPPair) BSONKey() bson.M {
//UniqueIPSet is a set of UniqueIPs which contains at most one instance of each UniqueIP
//this implementation is based on a slice of UniqueIPs rather than a map[string]UniqueIP
//since it requires less RAM.
type UniqueIPSet []UniqueIP
type UniqueIPSet map[string]UniqueIP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice on the type replacement

Copy link
Contributor

@fullmetalcache fullmetalcache left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found just one very minor thing on a second pass. Tested on a semi-large set on a system with 16GB RAM and 8 Cores allocated that is running on a server with 8 10K disks in RAID 10 config. Got around 8 minutes of parse time on current master. Got around 4 minutes with these changes. Nice!

err := errors.New("type mismatch found in log")
logger.WithFields(log.Fields{
"error": err,
"type in log": header.Types[index],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing but can we please remove the spaces and replace with underscores for the field names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

Copy link
Contributor

@fullmetalcache fullmetalcache left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@fullmetalcache fullmetalcache merged commit 13b2e77 into master Aug 20, 2021
@fullmetalcache fullmetalcache deleted the speed-up-parser branch August 20, 2021 21:28
@Zalgo2462 Zalgo2462 mentioned this pull request Apr 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants