Skip to content

Commit

Permalink
Merge pull request #1039 from ninoseki/fix-parallel-enricher-issue
Browse files Browse the repository at this point in the history
feat: fix parallel enrichment issue
  • Loading branch information
ninoseki authored Jan 21, 2024
2 parents b843c48 + 8dd3120 commit 7676477
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 47 deletions.
5 changes: 0 additions & 5 deletions docs/enrichers/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ options:
retry_times: ...
retry_interval: ...
retry_exponential_backoff: ...
parallel: ...
```
### Timeout
Expand All @@ -34,7 +33,3 @@ options:
### Retry Exponential Backoff

`retry_exponential_backoff` (`bool`) controls whether to do exponential backoff. Optional. Defaults to `true`. Configurable via `RETRY_EXPONENTIAL_BACKOFF` environment variable.

### Parallel

`parallel` (`bool`) controls whether to allow parallel execution or not. Optional. Defaults to `false`. Configurable via `PARALLEL` environment variable.
7 changes: 0 additions & 7 deletions lib/mihari/actor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ def timeout
options[:timeout]
end

#
# @return [Boolean]
#
def parallel?
options[:parallel] || Mihari.config.parallel
end

def validate_configuration!
return if configured?

Expand Down
7 changes: 7 additions & 0 deletions lib/mihari/analyzers/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ def ignore_error?
options[:ignore_error] || Mihari.config.ignore_error
end

#
# @return [Boolean]
#
def parallel?
options[:parallel] || Mihari.config.parallel
end

# @return [Array<String>, Array<Mihari::Models::Artifact>]
def artifacts
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
Expand Down
7 changes: 7 additions & 0 deletions lib/mihari/emitters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ def initialize(rule:, options: nil)
@rule = rule
end

#
# @return [Boolean]
#
def parallel?
options[:parallel] || Mihari.config.parallel
end

# A target to emit the data
#
# @return [String]
Expand Down
18 changes: 5 additions & 13 deletions lib/mihari/rule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,11 @@ def unique_artifacts
# @return [Array<Mihari::Models::Artifact>]
#
def enriched_artifacts
@enriched_artifacts ||= unique_artifacts.map do |artifact|
serial_enrichers.each { |enricher| enricher.result(artifact) }
Parallel.each(parallel_enrichers) { |enricher| enricher.result(artifact) }

artifact
@enriched_artifacts ||= Parallel.map(unique_artifacts) do |artifact|
artifact.tap do |tapped|
# NOTE: To apply changes correctly, enrichers should be applied to an artifact serially
enrichers.each { |enricher| enricher.result(tapped) }
end
end
end

Expand Down Expand Up @@ -404,14 +404,6 @@ def enrichers
end
end

def parallel_enrichers
enrichers.select(&:parallel?)
end

def serial_enrichers
enrichers.reject(&:parallel?)
end

#
# Validate the data format
#
Expand Down
2 changes: 1 addition & 1 deletion lib/mihari/schemas/analyzer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,6 @@ module Analyzers
end
end

Analyzer = Schemas::Analyzers.get_or_composition
Analyzer = Schemas::Analyzers.compose_by_or
end
end
2 changes: 1 addition & 1 deletion lib/mihari/schemas/concerns/orrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Concerns
module Orrable
extend ActiveSupport::Concern

def get_or_composition
def compose_by_or
schemas = constants.map { |sym| const_get sym }
return schemas.first if schemas.length <= 1

Expand Down
12 changes: 6 additions & 6 deletions lib/mihari/schemas/emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,28 @@ module Emitters

Database = Dry::Schema.Params do
required(:emitter).value(Types::String.enum(*Mihari::Emitters::Database.keys))
optional(:options).hash(Options)
optional(:options).hash(EmitterOptions)
end

MISP = Dry::Schema.Params do
required(:emitter).value(Types::String.enum(*Mihari::Emitters::MISP.keys))
optional(:url).filled(:string)
optional(:api_key).filled(:string)
optional(:options).hash(Options)
optional(:options).hash(EmitterOptions)
end

TheHive = Dry::Schema.Params do
required(:emitter).value(Types::String.enum(*Mihari::Emitters::TheHive.keys))
optional(:url).filled(:string)
optional(:api_key).filled(:string)
optional(:options).hash(Options)
optional(:options).hash(EmitterOptions)
end

Slack = Dry::Schema.Params do
required(:emitter).value(Types::String.enum(*Mihari::Emitters::Slack.keys))
optional(:webhook_url).filled(:string)
optional(:channel).filled(:string)
optional(:options).hash(Options)
optional(:options).hash(EmitterOptions)
end

Webhook = Dry::Schema.Params do
Expand All @@ -40,10 +40,10 @@ module Emitters
optional(:method).value(Types::HTTPRequestMethods).default("POST")
optional(:headers).filled(:hash)
optional(:template).filled(:string)
optional(:options).hash(Options)
optional(:options).hash(EmitterOptions)
end
end

Emitter = Schemas::Emitters.get_or_composition
Emitter = Schemas::Emitters.compose_by_or
end
end
2 changes: 1 addition & 1 deletion lib/mihari/schemas/enricher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ module Enrichers
end
end

Enricher = Schemas::Enrichers.get_or_composition
Enricher = Schemas::Enrichers.compose_by_or
end
end
25 changes: 15 additions & 10 deletions lib/mihari/schemas/options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,29 @@
module Mihari
module Schemas
Options = Dry::Schema.Params do
optional(:retry_times).value(:integer).default(Mihari.config.retry_times)
optional(:retry_interval).value(:integer).default(Mihari.config.retry_interval)
optional(:retry_exponential_backoff).value(:bool).default(Mihari.config.retry_exponential_backoff)
optional(:retry_times).value(:integer)
optional(:retry_interval).value(:integer)
optional(:retry_exponential_backoff).value(:bool)
optional(:timeout).value(:integer)
optional(:parallel).value(:bool).default(Mihari.config.parallel)
end

IgnoreErrorOptions = Dry::Schema.Params do
optional(:ignore_error).value(:bool).default(Mihari.config.ignore_error)
ParallelOptions = Dry::Schema.Params do
optional(:parallel).value(:bool)
end

AnalyzerOptions = Options | IgnoreErrorOptions
IgnoreErrorOptions = Dry::Schema.Params do
optional(:ignore_error).value(:bool)
end

PaginationOptions = Dry::Schema.Params do
optional(:pagination_interval).value(:integer).default(Mihari.config.pagination_interval)
optional(:pagination_limit).value(:integer).default(Mihari.config.pagination_limit)
optional(:pagination_interval).value(:integer)
optional(:pagination_limit).value(:integer)
end

AnalyzerPaginationOptions = AnalyzerOptions | PaginationOptions
AnalyzerOptions = Options & IgnoreErrorOptions & ParallelOptions

AnalyzerPaginationOptions = AnalyzerOptions & PaginationOptions

EmitterOptions = Options & ParallelOptions
end
end
6 changes: 3 additions & 3 deletions lib/mihari/schemas/rule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ module Schemas
optional(:created_on).value(:date)
optional(:updated_on).value(:date)

required(:queries).value(:array).each { Analyzer } # rubocop:disable Lint/Void
optional(:emitters).value(:array).each { Emitter }.default(DEFAULT_EMITTERS) # rubocop:disable Lint/Void
optional(:enrichers).value(:array).each { Enricher }.default(DEFAULT_ENRICHERS) # rubocop:disable Lint/Void
required(:queries).array { Analyzer }
optional(:emitters).array { Emitter }.default(DEFAULT_EMITTERS)
optional(:enrichers).array { Enricher }.default(DEFAULT_ENRICHERS)

optional(:data_types).filled(array[Types::DataTypes]).default(Mihari::Types::DataTypes.values)

Expand Down
1 change: 1 addition & 0 deletions spec/commands/search_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class SearchCLI < Mihari::CLI::Base

before do
allow(rule).to receive(:enrichers).and_return([])
allow(Parallel).to receive(:processor_count).and_return(0)
end

describe "#search" do
Expand Down

0 comments on commit 7676477

Please sign in to comment.