Skip to content

Commit

Permalink
Merge pull request #7 from OpenG2P/1.0
Browse files Browse the repository at this point in the history
1.0
  • Loading branch information
venky-ganapathy authored Dec 9, 2024
2 parents c2742ea + d738f60 commit 6dd4642
Show file tree
Hide file tree
Showing 15 changed files with 197 additions and 46 deletions.
File renamed without changes.
64 changes: 64 additions & 0 deletions .github/workflows/beat-producers-tag.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: SR BG Tasks Beat Producers Tag Workflow

on:
push:
tags:
- "*"
branches-ignore:
- '*'
workflow_dispatch:

jobs:
publish-to-pypi:
name: Publish to PyPI For Beat Producers Tag
runs-on: ubuntu-latest
steps:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- uses: actions/checkout@v3
- name: Install build dependencies
run: pip install build
- name: Build distribution
run: python -m build ./openg2p-sr-celery-beat-producers
- name: Publish
uses: pypa/gh-action-pypi-publish@release/v1
with:
password: ${{ secrets.PYPI_API_TOKEN }}
packages-dir: ./openg2p-sr-celery-beat-producers/dist/

docker-build-for-tag:
name: Docker Build and Push For Beat Producers Tag
needs: publish-to-pypi
runs-on: ubuntu-latest
env:
NAMESPACE: ${{ secrets.docker_hub_organisation || 'openg2p' }}
SERVICE_NAME: openg2p-sr-celery-beat-producers
steps:
- uses: actions/checkout@v3
- name: Docker build
run: |
echo "TAG_NAME=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV
IMAGE_ID=$NAMESPACE/$SERVICE_NAME
# Change all uppercase to lowercase
IMAGE_ID=$(echo $IMAGE_ID | tr '[A-Z]' '[a-z]')
VERSION=$TAG_NAME
echo IMAGE_ID=$IMAGE_ID
echo VERSION=$VERSION
echo IMAGE_ID=$IMAGE_ID >> $GITHUB_ENV
echo VERSION=$VERSION >> $GITHUB_ENV
docker build ./openg2p-sr-celery-beat-producers -f ./openg2p-sr-celery-beat-producers/Dockerfile-pypi \
--tag $IMAGE_ID:$VERSION
if [[ '${{ secrets.docker_hub_token }}' != '' && '${{ secrets.docker_hub_actor }}' != '' ]]; then
export DOCKER_PUSH="true"
echo DOCKER_PUSH=$DOCKER_PUSH >> $GITHUB_ENV
fi
- name: Docker push
if: env.DOCKER_PUSH == 'true'
run: |
echo "${{ secrets.docker_hub_token }}" | docker login -u ${{ secrets.docker_hub_actor }} --password-stdin
docker push ${{ env.IMAGE_ID }}:${{ env.VERSION }}
65 changes: 65 additions & 0 deletions .github/workflows/workers-tag.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
name: SR BG Tasks Celery Worker Tag Workflow

on:
push:
tags:
- "*"
branches-ignore:
- '*'
workflow_dispatch:

jobs:
publish-to-pypi:
name: Publish to PyPI For Workers Tag
runs-on: ubuntu-latest
steps:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- uses: actions/checkout@v3
- name: Install build dependencies
run: pip install build
- name: Build distribution
run: python -m build ./openg2p-sr-celery-workers
- name: Publish
uses: pypa/gh-action-pypi-publish@release/v1
with:
password: ${{ secrets.PYPI_API_TOKEN }}
packages-dir: ./openg2p-sr-celery-workers/dist/


docker-build-for-tag:
name: Docker Build and Push For Workers Tag
needs: publish-to-pypi
runs-on: ubuntu-latest
env:
NAMESPACE: ${{ secrets.docker_hub_organisation || 'openg2p' }}
SERVICE_NAME: openg2p-sr-celery-workers
steps:
- uses: actions/checkout@v3
- name: Docker build
run: |
echo "TAG_NAME=${GITHUB_REF#refs/tags/}" >> $GITHUB_ENV
IMAGE_ID=$NAMESPACE/$SERVICE_NAME
# Change all uppercase to lowercase
IMAGE_ID=$(echo $IMAGE_ID | tr '[A-Z]' '[a-z]')
VERSION=$TAG_NAME
echo IMAGE_ID=$IMAGE_ID
echo VERSION=$VERSION
echo IMAGE_ID=$IMAGE_ID >> $GITHUB_ENV
echo VERSION=$VERSION >> $GITHUB_ENV
docker build ./openg2p-sr-celery-workers -f ./openg2p-sr-celery-workers/Dockerfile-pypi \
--tag $IMAGE_ID:$VERSION
if [[ '${{ secrets.docker_hub_token }}' != '' && '${{ secrets.docker_hub_actor }}' != '' ]]; then
export DOCKER_PUSH="true"
echo DOCKER_PUSH=$DOCKER_PUSH >> $GITHUB_ENV
fi
- name: Docker push
if: env.DOCKER_PUSH == 'true'
run: |
echo "${{ secrets.docker_hub_token }}" | docker login -u ${{ secrets.docker_hub_actor }} --password-stdin
docker push ${{ env.IMAGE_ID }}:${{ env.VERSION }}
2 changes: 1 addition & 1 deletion openg2p-sr-celery-beat-producers/Dockerfile-git
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ RUN python3 -m venv venv \
RUN python3 -m pip install \
git+https://github.com/openg2p/[email protected]\#subdirectory=openg2p-fastapi-common \
git+https://github.com/openg2p/[email protected]\#subdirectory=openg2p-fastapi-auth \
git+https://github.com/OpenG2P/openg2p-social-registry-bg-tasks@develop\#subdirectory=openg2p-sr-models \
git+https://github.com/OpenG2P/openg2p-social-registry-bg-tasks@1.0\#subdirectory=openg2p-sr-models \
./src

USER ${container_user}
Expand Down
6 changes: 3 additions & 3 deletions openg2p-sr-celery-beat-producers/Dockerfile-pypi
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ ADD --chown=${container_user}:${container_user_group} main.py /app
RUN python3 -m venv venv \
&& . ./venv/bin/activate
RUN python3 -m pip install \
openg2p-fastapi-common==1.1.0 \
openg2p-fastapi-auth==1.1.0 \
openg2p-fastapi-common==1.1.1 \
openg2p-fastapi-auth==1.1.1 \
openg2p-sr-models==1.0.0 \
openg2p-sr-celery-beat-producers
openg2p-sr-celery-beat-producers==1.0.0

USER ${container_user}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ class Settings(BaseSettings):

res_partner_id_generation_frequency: int = 10
res_partner_id_update_frequency: int = 10

batch_size: int = 10000
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def id_generation_request_beat_producer():
).update(
{
G2PQueIDGeneration.id_generation_request_status: IDGenerationRequestStatus.FAILED,
G2PQueIDGeneration.last_attempt_datetime: datetime.utcnow(),
G2PQueIDGeneration.last_attempt_datetime_request: datetime.utcnow(),
},
synchronize_session=False,
)
Expand All @@ -37,12 +37,14 @@ def id_generation_request_beat_producer():
# Select entries that are PENDING for request status and have not exceeded max attempts
pending_request_entries = (
session.execute(
select(G2PQueIDGeneration).filter(
select(G2PQueIDGeneration)
.filter(
G2PQueIDGeneration.id_generation_request_status
== IDGenerationRequestStatus.PENDING,
G2PQueIDGeneration.number_of_attempts_request
< _config.max_id_generation_request_attempts,
)
.limit(_config.batch_size)
)
.scalars()
.all()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def id_generation_update_beat_producer():
).update(
{
G2PQueIDGeneration.id_generation_update_status: IDGenerationUpdateStatus.FAILED,
G2PQueIDGeneration.last_attempt_datetime: datetime.utcnow(),
G2PQueIDGeneration.last_attempt_datetime_update: datetime.utcnow(),
},
synchronize_session=False,
)
Expand All @@ -41,14 +41,16 @@ def id_generation_update_beat_producer():
# Select entries that have COMPLETED request and PENDING update status
pending_update_entries = (
session.execute(
select(G2PQueIDGeneration).filter(
select(G2PQueIDGeneration)
.filter(
G2PQueIDGeneration.id_generation_request_status
== IDGenerationRequestStatus.COMPLETED,
G2PQueIDGeneration.id_generation_update_status
== IDGenerationUpdateStatus.PENDING,
G2PQueIDGeneration.number_of_attempts_update
< _config.max_id_generation_update_attempts,
)
.limit(_config.batch_size)
)
.scalars()
.all()
Expand Down
18 changes: 14 additions & 4 deletions openg2p-sr-celery-workers/.env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
AUTH_URL=https://keycloak.openg2p.org/realms/master/protocol/openid-connect/token
AUTH_CLIENT_ID=client_id
AUTH_CLIENT_SECRET=client_secret
AUTH_GRANT_TYPE=client_credentials
SR_CELERY_WORKERS_DB_DBNAME: socialregistrydb
SR_CELERY_WORKERS_DB_USERNAME: socialregistryuser
SR_CELERY_WORKERS_DB_HOSTNAME: '172.29.8.235'
SR_CELERY_WORKERS_DB_PORT: 5432
SR_CELERY_WORKERS_CELERY_BROKER_URL: redis://{{ .Release.Name }}-redis-master:6379/0
SR_CELERY_WORKERS_CELERY_BACKEND_URL: redis://{{ .Release.Name }}-redis-master:6379/0
SR_CELERY_WORKERS_AUTH_URL: https://keycloak.openg2p.net/realms/master/protocol/openid-connect/token
SR_CELERY_WORKERS_AUTH_CLIENT_ID: openg2p-sr-loadtest
SR_CELERY_WORKERS_AUTH_CLIENT_SECRET: ""
SR_CELERY_WORKERS_AUTH_GRANT_TYPE: client_credentials
SR_CELERY_WORKERS_MOSIP_GET_UIN_URL: https://idgenerator.loadtest.openg2p.net/v1/idgenerator/uin
SR_CELERY_WORKERS_MOSIP_UPDATE_UIN_URL: https://idgenerator.loadtest.openg2p.net/v1/idgenerator/uin
SR_CELERY_WORKERS_MAX_ID_GENERATION_REQUEST_ATTEMPTS: 3
SR_CELERY_WORKERS_MAX_ID_GENERATION_UPDATE_ATTEMPTS: 3
2 changes: 1 addition & 1 deletion openg2p-sr-celery-workers/Dockerfile-git
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ RUN python3 -m venv venv \
RUN python3 -m pip install \
git+https://github.com/openg2p/[email protected]\#subdirectory=openg2p-fastapi-common \
git+https://github.com/openg2p/[email protected]\#subdirectory=openg2p-fastapi-auth \
git+https://github.com/OpenG2P/openg2p-social-registry-bg-tasks@develop\#subdirectory=openg2p-sr-models \
git+https://github.com/OpenG2P/openg2p-social-registry-bg-tasks@1.0\#subdirectory=openg2p-sr-models \
./src

USER ${container_user}
Expand Down
8 changes: 4 additions & 4 deletions openg2p-sr-celery-workers/Dockerfile-pypi
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ ADD --chown=${container_user}:${container_user_group} main.py /app
RUN python3 -m venv venv \
&& . ./venv/bin/activate
RUN python3 -m pip install \
openg2p-fastapi-common==develop \
openg2p-fastapi-auth==develop \
openg2p-sr-models==develop \
openg2p-sr-celery-workers
openg2p-fastapi-common==1.1.1 \
openg2p-fastapi-auth==1.1.1 \
openg2p-sr-models==1.0.0 \
openg2p-sr-celery-workers==1.0.0

USER ${container_user}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,27 @@ def id_generation_request_worker(registrant_id: str):
if not uin:
raise Exception("UIN not received from MOSIP")

# Update res_partner.ref_id with the MOSIP Generated ID
# Update res_partner.unique_id with the MOSIP Generated ID
res_partner = (
session.query(ResPartner)
.filter(ResPartner.registrant_id == registrant_id)
.first()
session.query(ResPartner).filter(ResPartner.id == registrant_id).first()
)

if not res_partner:
raise Exception(
f"No res_partner entry found for registrant_id: {registrant_id}"
)

# Check if the UIN is already present in res_partner.ref_id
# Check if the UIN is already present in res_partner.unique_id
existing_partner_with_uin = (
session.query(ResPartner).filter(ResPartner.ref_id == uin).first()
session.query(ResPartner).filter(ResPartner.unique_id == uin).first()
)

if existing_partner_with_uin:
raise Exception(
f"MOSIP ID {uin} is already present in res_partner.ref_id"
f"MOSIP ID {uin} is already present in res_partner.unique_id"
)

res_partner.ref_id = uin
res_partner.unique_id = uin
session.commit()

# Update queue entry statuses
Expand All @@ -96,7 +94,7 @@ def id_generation_request_worker(registrant_id: str):
IDGenerationRequestStatus.COMPLETED
)
queue_entry.id_generation_update_status = IDGenerationUpdateStatus.PENDING
queue_entry.last_attempt_datetime = datetime.utcnow()
queue_entry.last_attempt_datetime_request = datetime.utcnow()
queue_entry.last_attempt_error_code_request = None
session.commit()

Expand All @@ -110,7 +108,7 @@ def id_generation_request_worker(registrant_id: str):

if queue_entry:
queue_entry.number_of_attempts_request += 1
queue_entry.last_attempt_datetime = datetime.utcnow()
queue_entry.last_attempt_datetime_request = datetime.utcnow()
queue_entry.last_attempt_error_code_request = str(e)
if (
queue_entry.number_of_attempts_request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,14 @@ def id_generation_update_worker(registrant_id: str):

# Fetch res_partner to get the UIN
res_partner = (
session.query(ResPartner)
.filter(ResPartner.registrant_id == registrant_id)
.first()
session.query(ResPartner).filter(ResPartner.id == registrant_id).first()
)

if not res_partner or not res_partner.ref_id:
if not res_partner or not res_partner.unique_id:
raise Exception(
f"No UIN found for registrant_id: {registrant_id} in res_partner"
)

uin = res_partner.ref_id

# Get OIDC token
access_token = OAuthTokenService.get_component().get_oauth_token()
_logger.info("Received access token")
Expand All @@ -64,41 +60,52 @@ def id_generation_update_worker(registrant_id: str):
"Cookie": f"Authorization={access_token}",
"Accept": "application/json",
}
current_datetime = datetime.utcnow()
formatted_datetime = (
current_datetime.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
)

# Call MOSIP Update UIN API to update status
update_payload = {
"id": "string",
"metadata": {},
"request": {"uin": uin, "status": "ASSIGNED"},
"requesttime": datetime.utcnow().isoformat(),
"request": {"uin": res_partner.unique_id, "status": "ASSIGNED"},
"requesttime": formatted_datetime,
"version": "string",
}
response = httpx.put(
_config.mosip_update_uin_url, json=update_payload, headers=headers
)
_logger.info(
f"Received response from MOSIP Update UIN API: {response.text}"
)
if response.status_code != 200:
raise Exception(
f"MOSIP Update UIN API call failed with status code {response.status_code}"
)

# Update queue entry statuses
# Status code is 200
if response.json().get("errors"):
raise Exception(
f"MOSIP Update UIN API call failed with error: {response.json().get('errors')}"
)

# Status is 200 and No errors then update queue entry statuses
queue_entry.number_of_attempts_update += 1
queue_entry.id_generation_update_status = IDGenerationUpdateStatus.COMPLETED
queue_entry.last_attempt_datetime = datetime.utcnow()
queue_entry.last_attempt_datetime_update = datetime.utcnow()
queue_entry.last_attempt_error_code_update = None
session.commit()

_logger.info(
f"ID generation update completed for registrant_id: {registrant_id}"
)
_logger.info(f"Mosip update completed for registrant_id: {registrant_id}")

except Exception as e:
error_message = f"Error during ID generation update for registrant_id {registrant_id}: {str(e)}"
_logger.error(error_message)

if queue_entry:
queue_entry.number_of_attempts_update += 1
queue_entry.last_attempt_datetime = datetime.utcnow()
queue_entry.last_attempt_datetime_update = datetime.utcnow()
queue_entry.last_attempt_error_code_update = str(e)
if (
queue_entry.number_of_attempts_update
Expand Down
Loading

0 comments on commit 6dd4642

Please sign in to comment.