-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #13 from airtai/implement-mailchimp-logic
Implement mailchimp logic
- Loading branch information
Showing
8 changed files
with
601 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
class Config: | ||
def __init__(self, dc: str, api_key: str): | ||
"""Initialize the Config object. | ||
Args: | ||
dc (str): The data center identifier for the Mailchimp API. | ||
api_key (str): The API key for accessing Mailchimp. | ||
""" | ||
self.base_url = f"https://{dc}.api.mailchimp.com/3.0" | ||
self.headers = { | ||
"Authorization": f"Bearer {api_key}", | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
from collections import defaultdict | ||
from datetime import datetime | ||
from typing import Literal | ||
|
||
import pandas as pd | ||
|
||
from ..config import Config | ||
from ..services.mailchimp_service import MailchimpService | ||
|
||
next_tag_map = { | ||
"M1": "M2", | ||
"M2": "M3", | ||
"M3": None, | ||
} | ||
|
||
|
||
def _create_add_and_remove_tags_dicts( | ||
members_with_tags_df: pd.DataFrame, | ||
) -> tuple[dict[str, list[str]], dict[str, list[str]]]: | ||
# keys are tags, values are list of member ids | ||
add_tag_members = defaultdict(list) | ||
remove_tag_members = defaultdict(list) | ||
|
||
for _, row in members_with_tags_df.iterrows(): | ||
member_id = row["id"] | ||
tags = row["tags"] | ||
for tag in tags: | ||
tag_name = tag["name"] | ||
if tag_name not in next_tag_map: | ||
continue | ||
|
||
next_tag = next_tag_map[tag_name] | ||
if next_tag is None: | ||
continue | ||
|
||
add_tag_members[next_tag].append(member_id) | ||
remove_tag_members[tag_name].append(member_id) | ||
|
||
return add_tag_members, remove_tag_members | ||
|
||
|
||
def _batch_update_tags( | ||
mailchimp_service: MailchimpService, | ||
list_id: str, | ||
tag_members: dict[str, list[str]], | ||
status: Literal["active", "inactive"], | ||
) -> None: | ||
for tag_name, member_ids in tag_members.items(): | ||
mailchimp_service.post_batch_update_members_tag( | ||
list_id=list_id, | ||
member_ids=member_ids, | ||
tag_name=tag_name, | ||
status=status, | ||
) | ||
if status == "active": | ||
# Add additional tag with the current date | ||
tag_name_with_date = f"{tag_name} - {datetime.now().strftime('%d.%m.%Y.')}" | ||
mailchimp_service.post_batch_update_members_tag( | ||
list_id=list_id, | ||
member_ids=member_ids, | ||
tag_name=tag_name_with_date, | ||
status=status, | ||
) | ||
|
||
|
||
def _add_and_remove_tags( | ||
mailchimp_service: MailchimpService, | ||
list_id: str, | ||
members_with_tags_df: pd.DataFrame, | ||
) -> tuple[dict[str, list[str]], dict[str, list[str]]]: | ||
add_tag_members, remove_tag_members = _create_add_and_remove_tags_dicts( | ||
members_with_tags_df=members_with_tags_df, | ||
) | ||
|
||
_batch_update_tags( | ||
mailchimp_service=mailchimp_service, | ||
list_id=list_id, | ||
tag_members=add_tag_members, | ||
status="active", | ||
) | ||
|
||
_batch_update_tags( | ||
mailchimp_service=mailchimp_service, | ||
list_id=list_id, | ||
tag_members=remove_tag_members, | ||
status="inactive", | ||
) | ||
|
||
return add_tag_members, remove_tag_members | ||
|
||
|
||
def update_tags( | ||
crm_df: pd.DataFrame, config: Config, list_name: str | ||
) -> tuple[dict[str, list[str]], dict[str, list[str]]]: | ||
"""Update tags for members in the CRM.""" | ||
# Create a Mailchimp service | ||
mailchimp_service = MailchimpService(config) | ||
|
||
# Get the list ID for the list name | ||
account_lists = mailchimp_service.get_account_lists() | ||
list_id = None | ||
for account_list in account_lists["lists"]: | ||
if account_list["name"] == list_name: | ||
list_id = account_list["id"] | ||
|
||
if list_id is None: | ||
raise ValueError(f"List {list_name} not found in account lists.") | ||
|
||
# Get the members with tags | ||
members_with_tags = mailchimp_service.get_members_with_tags(list_id) | ||
|
||
members_with_tags_df = pd.DataFrame(members_with_tags["members"]) | ||
members_with_tags_df.rename(columns={"email_address": "email"}, inplace=True) | ||
|
||
# filter only emails that are in the CRM | ||
crm_emails = crm_df["email"].unique() | ||
members_with_tags_df = members_with_tags_df[ | ||
members_with_tags_df["email"].isin(crm_emails) | ||
] | ||
|
||
add_tag_members, remove_tag_members = _add_and_remove_tags( | ||
mailchimp_service=mailchimp_service, | ||
list_id=list_id, | ||
members_with_tags_df=members_with_tags_df, | ||
) | ||
|
||
return add_tag_members, remove_tag_members |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
import json | ||
from typing import Any, Literal | ||
|
||
import requests | ||
from tenacity import retry, stop_after_attempt, wait_exponential | ||
|
||
from ..config import Config | ||
|
||
|
||
class MailchimpService: | ||
def __init__(self, config: Config) -> None: | ||
"""Initialize the MailchimpService with a configuration. | ||
Args: | ||
config (Config): The configuration object containing API details. | ||
""" | ||
self.config = config | ||
|
||
@retry( | ||
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) | ||
) | ||
def _mailchim_request_get(self, url: str) -> dict[str, list[dict[str, str]]]: | ||
response = requests.get(url, headers=self.config.headers, timeout=10) | ||
|
||
if response.status_code < 200 or response.status_code >= 300: | ||
# This automatically raises an HTTPError with details | ||
response.raise_for_status() | ||
|
||
return response.json() # type: ignore[no-any-return] | ||
|
||
def _mailchimp_request_post(self, url: str, body: dict[str, Any]) -> dict[str, Any]: | ||
response = requests.post( | ||
url, headers=self.config.headers, json=body, timeout=10 | ||
) | ||
|
||
# Check if the response is not 200-299 | ||
if response.status_code < 200 or response.status_code >= 300: | ||
# This automatically raises an HTTPError with details | ||
response.raise_for_status() | ||
|
||
return response.json() # type: ignore[no-any-return] | ||
|
||
def get_account_lists(self) -> dict[str, list[dict[str, str]]]: | ||
"""Get information about all lists in the account.""" | ||
url = f"{self.config.base_url}/lists?fields=lists.id,lists.name" | ||
|
||
return self._mailchim_request_get(url) | ||
|
||
def get_members_with_tags(self, list_id: str) -> dict[str, Any]: | ||
url = f"{self.config.base_url}/lists/{list_id}/members?fields=members.id,members.email_address,members.tags" | ||
|
||
return self._mailchim_request_get(url) | ||
|
||
def get_members(self, list_id: str) -> dict[str, list[dict[str, str]]]: | ||
url = f"{self.config.base_url}/lists/{list_id}/members?fields=members.email_address,members.id" | ||
|
||
return self._mailchim_request_get(url) | ||
|
||
def get_tags(self, list_id: str, member_id: str) -> dict[str, list[dict[str, str]]]: | ||
url = f"{self.config.base_url}/lists/{list_id}/members/{member_id}/tags?fields=tags.name" | ||
|
||
return self._mailchim_request_get(url) | ||
|
||
@retry( | ||
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) | ||
) | ||
def _post_batch_update_members_tag( | ||
self, | ||
list_id: str, | ||
member_ids: list[str], | ||
tag_name: str, | ||
status: Literal["active", "inactive"], | ||
) -> dict[str, str]: | ||
url = f"{self.config.base_url}/batches" | ||
body = { | ||
"operations": [ | ||
{ | ||
"method": "POST", | ||
"path": f"/lists/{list_id}/members/{member_id}/tags", | ||
"body": json.dumps( | ||
{"tags": [{"name": tag_name, "status": status}]} | ||
), | ||
} | ||
for member_id in member_ids | ||
] | ||
} | ||
return self._mailchimp_request_post(url, body) | ||
|
||
def post_batch_update_members_tag( | ||
self, | ||
list_id: str, | ||
member_ids: list[str], | ||
tag_name: str, | ||
status: Literal["active", "inactive"] = "active", | ||
) -> dict[str, str]: | ||
# Split member_ids into chunks of 200 | ||
for i in range(0, len(member_ids), 200): | ||
self._post_batch_update_members_tag( | ||
list_id, member_ids[i : i + 200], tag_name, status | ||
) | ||
return {"status": "success"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.