Skip to content

Commit

Permalink
Adds stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Mr-Lopes committed Nov 8, 2023
1 parent a0293db commit fe76d26
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 38 deletions.
6 changes: 5 additions & 1 deletion deployment/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ function init() {
SETUP_SHEET_ID=$(cat config.json | jq -r '.setup_sheet_id')
SETUP_JSON_URL=$(cat config.json | jq -r '.setup_json_url')
SETUP_FIRESTORE_COLLECTION=$(cat config.json | jq -r '.setup_firestore_collection')

echo -n "Optin to collect crash/usage stats to improve the solution. This helps us suporting the solution (E.g. true/false):"
read -r SHARE_CRASH_USAGE_STATS
echo
# Validate params before deployment
check_valid_parameters
# Confirm details
Expand Down Expand Up @@ -259,7 +263,7 @@ function init() {
# Build metadata and copy it to Cloud Storage
start_message "Building Dataflow metadata..."
cd ..
sh ./deployment/deploy_cloud.sh ${GOOGLE_CLOUD_PROJECT} ${BUCKET_NAME} ${REGION} ${SERVICE_ACCOUNT}
sh ./deployment/deploy_cloud.sh ${GOOGLE_CLOUD_PROJECT} ${BUCKET_NAME} ${REGION} ${SERVICE_ACCOUNT} ${SHARE_CRASH_USAGE_STATS}
echo

echo "${bold}${text_green} Done!${reset}"
Expand Down
18 changes: 15 additions & 3 deletions deployment/deploy_cloud.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
# limitations under the License.


if [ $# != 4 ]; then
echo "Usage: $0 gcp_project_id bucket_name region service_account_email"
if [ $# != 5 ]; then
echo "Usage: $0 gcp_project_id bucket_name region service_account_email share_crash_usage_stats(true/false)"
echo "Sharing crash/usage stats helps us suporting the solution (E.g. true/false)"
exit 1
fi

Expand All @@ -30,7 +31,18 @@ python3 -m pip install --user -q -r requirements.txt
echo $4
echo "Update commit info inside code"
sed -i "s/MEGALISTA_VERSION\s*=.*/MEGALISTA_VERSION = '$(git rev-parse HEAD)'/" ./config/version.py
python3 -m main --runner DataflowRunner --project "$1" --gcp_project_id "$1" --temp_location "gs://$2/tmp/" --region "$3" --setup_file ./setup.py --template_location "gs://$2/templates/megalista" --num_workers 1 --autoscaling_algorithm=NONE --service_account_email "$4"
python3 -m main \
--runner DataflowRunner \
--project "$1" \
--gcp_project_id "$1" \
--temp_location "gs://$2/tmp/" \
--region "$3" \
--setup_file ./setup.py \
--template_location "gs://$2/templates/megalista" \
--num_workers 1 \
--autoscaling_algorithm=NONE \
--service_account_email "$4" \
--collect_usage_stats "$5"
echo "Copy megalista_medata to bucket $2"
gsutil cp megalista_metadata "gs://$2/templates/megalista_metadata"
echo "Cleanup"
Expand Down
6 changes: 6 additions & 0 deletions megalista_dataflow/megalista_metadata
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@
"label": "Should show code lines in log messages?",
"help_text": "Should show code lines in log messages? True or False",
"is_optional": "true"
},
{
"name": "collect_usage_stats",
"label": "Should show code lines in log messages?",
"help_text": "Share crash/usage stats that helps us suporting the solution? True or False",
"is_optional": "true"
}
]
}
7 changes: 6 additions & 1 deletion megalista_dataflow/models/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--show_code_lines_in_log',
default=False,
help='Should show code lines in log messages. True or False')
help='Should show code lines in log messages. True or False')
# DEBUG
parser.add_value_provider_argument(
'--collect_usage_stats',
default=True,
help='Optin to collect usage stats. True or False. This helps us suporting the solution')
1 change: 1 addition & 0 deletions megalista_dataflow/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ typed-ast==1.5.5
google_auth==2.23.4
fasteners==0.19
google-apitools==0.5.32
tadau
# Test deps
mypy==1.6.1
pytest==7.2.2
Expand Down
14 changes: 7 additions & 7 deletions megalista_dataflow/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@

setuptools.setup(
name='megalista_dataflow',
version='4.1',
version='5',
author='Google',
author_email='[email protected]',
url='https://github.com/google/megalista/',
install_requires=['google-ads-megalista',
'google-api-python-client==2.81.0',
'google-cloud-bigquery==3.3.0',
'aiohttp==3.8.3',
'google-cloud-storage==2.7.0',
'google-api-python-client==2.106.0',
'google-cloud-bigquery==3.13.0',
'aiohttp==3.8.6',
'google-cloud-storage==2.13.0',
'google-cloud-firestore==2.4.0',
'protobuf==3.20.3',
'pandas==1.5.3',
'boto3==1.26.66'],
'pandas==2.1.2',
'boto3==1.28.78'],
packages=setuptools.find_packages(),
)
119 changes: 93 additions & 26 deletions megalista_dataflow/steps/last_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,118 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from distutils.log import Log
import datetime
import apache_beam as beam
import logging
from error.logging_handler import LoggingHandler
from config.version import MEGALISTA_VERSION
from models.execution import Execution
from .megalista_step import MegalistaStep
from config.logging import LoggingConfig
from steps.megalista_step import MegalistaStepParams


class LastStep(MegalistaStep):
def expand(self, executions):
return (
executions
| beam.Flatten()
| beam.CombineGlobally(CombineExecutionsFn())
| beam.ParDo(PrintResultsDoFn())
| beam.ParDo(PrintResultsDoFn(self._params))
)


class CombineExecutionsFn(beam.CombineFn):
def create_accumulator(self):
return {}

def add_input(self, accumulator, input: Execution):
key = f"{input.source.source_name} -> {input.destination.destination_name}"
if key not in accumulator:
accumulator[key] = input
return accumulator

def merge_accumulators(self, accumulators):
merged = {}
for accum in accumulators:
for item in accum.items():
key = item[0]
if key not in merged:
merged[key] = item[1]
return merged

def extract_output(self, accumulator):
return accumulator
def create_accumulator(self):
return {}

def add_input(self, accumulator, input: Execution):
key = f"{input.source.source_name} -> {input.destination.destination_name}"
if key not in accumulator:
accumulator[key] = input
return accumulator

def merge_accumulators(self, accumulators):
merged = {}
for accum in accumulators:
for item in accum.items():
key = item[0]
if key not in merged:
merged[key] = item[1]
return merged

def extract_output(self, accumulator):
return accumulator


class PrintResultsDoFn(beam.DoFn):
def __init__(self, params: MegalistaStepParams):
self._params = params

def check_stats(self, statistics):
try:
if (
self._params._dataflow_options.collect_usage_stats != "False"
and len(statistics) > 0
):
from tadau.measurement_protocol import Tadau

Tadau().process(
[
{
"client_id": f"{int(datetime.datetime.now().timestamp()*10e3)}",
"name": "Megalista",
"version": f"{MEGALISTA_VERSION}",
**stat,
}
for stat in statistics
]
)

except:
pass

def process(self, executions):
logging_handler = LoggingConfig.get_logging_handler()

if logging_handler is None:
logging.getLogger("megalista").info(f"Clould not find error interception handler. Skipping error intereception.")
logging.getLogger("megalista").info(
f"Clould not find error interception handler. Skipping error intereception."
)
else:
if logging_handler.has_errors:
logging.getLogger("megalista.LOG").error(f"SUMMARY OF ERRORS:\n{LoggingHandler.format_records(logging_handler.error_records)}")
if logging_handler.has_errors:
logging.getLogger("megalista.LOG").error(
f"SUMMARY OF ERRORS:\n{LoggingHandler.format_records(logging_handler.error_records)}"
)

# Runs stats silently
try:
self.check_stats(
[
{
"action": "ran",
"solution": executed._destination._destination_type.name,
"target": executed._destination._destination_metadata[0],
"ads": executed._account_config.google_ads_account_id,
"cm": executed._account_config.campaign_manager_profile_id,
"ga": executed._account_config.google_analytics_account_id,
}
for executed in executions.values()
]
)
if logging_handler:
if logging_handler.has_errors:
self.check_stats(
[
{
"action": "error",
"solution": logging_handler.error_records[i].name,
"message": logging_handler.error_records[i].message[
:500
],
}
for i in range(len(logging_handler.error_records))
]
)
except:
pass

0 comments on commit fe76d26

Please sign in to comment.