Skip to content

Commit

Permalink
Documentation, Dockerfile, cleaning
Browse files Browse the repository at this point in the history
* Improved documentation of some components
* Optimization of the Dockerfile
* Deletion of unused old scripts
sv-giampa committed Dec 15, 2023
1 parent 66760c2 commit 8ba65a1
Showing 8 changed files with 317 additions and 231 deletions.
19 changes: 12 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -3,17 +3,22 @@ FROM ubuntu:20.04
USER root
WORKDIR /root

# RUN alias apt_install="DEBIAN_FRONTEND=noninteractive apt update -y && DEBIAN_FRONTEND=noninteractive apt install -y"
# RUN alias apt_clean="apt clean && rm -rf /var/cache/apt/archives /var/lib/apt/lists/*"

COPY --chmod=777 apt_install /apt_install

# install utility software packages
RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y software-properties-common&& rm -rf /var/cache/apt/archives /var/lib/apt/lists/*
RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y inetutils-ping net-tools wget && rm -rf /var/cache/apt/archives /var/lib/apt/lists/*
RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y htop screen zip nano && rm -rf /var/cache/apt/archives /var/lib/apt/lists/*
RUN /apt_install software-properties-common
RUN /apt_install inetutils-ping net-tools wget
RUN /apt_install htop screen zip nano

# install and configure git
RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y git && rm -rf /var/cache/apt/archives /var/lib/apt/lists/*
RUN /apt_install git
RUN DEBIAN_FRONTEND=noninteractive git config --global commit.gpgsign false

# configure ssh daemon
RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y openssh-server && rm -rf /var/cache/apt/archives /var/lib/apt/lists/*
RUN /apt_install openssh-server
RUN if ! [ -d /var/run/sshd ]; then mkdir /var/run/sshd; fi
RUN echo 'root:password!!' | chpasswd
RUN sed -i 's/^[# ]*PermitRootLogin .*$/PermitRootLogin yes/g' /etc/ssh/sshd_config
@@ -41,9 +46,9 @@ RUN echo 'export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/' >> ~/.bashrc
RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y python3 python3-pip python3-dev && rm -rf /var/cache/apt/archives /var/lib/apt/lists/*

# install PyCOMPSs
RUN DEBIAN_FRONTEND=noninteractive apt update -y && apt install -y graphviz xdg-utils libtool automake build-essential \
RUN /apt_install graphviz xdg-utils libtool automake build-essential \
python python-dev libpython2.7 libboost-serialization-dev libboost-iostreams-dev libxml2 libxml2-dev csh gfortran \
libgmp3-dev flex bison texinfo libpapi-dev && rm -rf /var/cache/apt/archives /var/lib/apt/lists/*
libgmp3-dev flex bison texinfo libpapi-dev
RUN python3 -m pip install --upgrade pip setuptools
RUN python3 -m pip install dill guppy3
RUN python3 -m pip install "pycompss==3.1" -v
13 changes: 13 additions & 0 deletions apt_install
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

# script used for installing software through apt in Dockerfiles, avoiding layer cache and size problems

# update packages lists
DEBIAN_FRONTEND=noninteractive apt update -y

# install required software
DEBIAN_FRONTEND=noninteractive apt install -y $@

# clean apt cache and lists
DEBIAN_FRONTEND=noninteractive apt clean
rm -rf /var/cache/apt/archives /var/lib/apt/lists/*
57 changes: 0 additions & 57 deletions parsoda/function/analysis/parallel_fp_growth.py

This file was deleted.

38 changes: 0 additions & 38 deletions parsoda/function/analysis/sequential_fp_growth.py

This file was deleted.

33 changes: 19 additions & 14 deletions parsoda/model/driver/parsoda_driver.py
Original file line number Diff line number Diff line change
@@ -30,15 +30,13 @@ def init_environment(self) -> None:
def set_num_partitions(self, num_partitions: int) -> None:
"""
Sets the number of data partitions
:return: None
"""
pass

@abstractmethod
def set_chunk_size(self, chunk_size: int) -> None:
"""
Sets the size of data partitions in bytes
:return: None
"""
pass

@@ -55,53 +53,60 @@ def crawl(self, crawler: List[Crawler]) -> None:
After invoking this function the implementor should hold a representation of an initial dataset
(e.g., on Spark a new RDD is populated with the SocialDataItem objects provided by crawlers)
:return: None
"""
pass

@abstractmethod
def filter(self, filter_func: Callable[[Any], bool]) -> None:
"""
Applies the given filter to the current dataset, dropping all items that does not satisfy the filter
:param filter_func: the filter to apply
:return: None
Args:
filter_func: the filter to apply
"""
pass

@abstractmethod
def flatmap(self, mapper: Callable[[Any], Iterable[Any]]) -> None:
"""
Executes a mapping of each item to a list of custom key-value pairs, represented as tuples of two elements each
:param mapper: the (object -> list[(K,V)]) mapping function to apply
:return: None
Args:
mapper: the (object -> list[(K,V)]) mapping function to apply
"""
pass

def map(self, mapper: Callable[[Any], Any]) -> None:
"""
Executes a mapping of each item in the current dataset to a new object
:param mapper: the (object -> list[(K,V)]) mapping function to apply
:return: None
Executes a mapping of each item in the current dataset to a new object.
Args:
mapper: the (object -> list[(K,V)]) mapping function to apply
"""
self.flatmap(_flatmapper(mapper))

#TODO: documentation
def group_by_key(self) -> None:
"""Assumes that the current dataset is a bulk of key-value pairs and creates a new dataset which groups all the items with the same key. The new dataset will be a bulk of (key)-(list-of-values) pairs.
"""Assumes that the current dataset is a bulk of key-value pairs
and creates a new dataset which groups all the items with the same key.
The new dataset will be a bulk of (key)-(list-of-values) pairs.
"""
pass

def get_result(self) -> Any:
"""
Gets the current dataset
:return: the current dataset
Returns:
Any: the current dataset
"""
pass

@abstractmethod
def dispose_environment(self) -> None:
"""
Disposes instantiated resources of the underlying environment, after executing the ParSoDA application, in order to reuse this driver as a new fresh driver that should be re-initialized
:return: None
Disposes instantiated resources of the underlying environment,
after executing the ParSoDA application, in order to reuse
this driver as a new fresh driver that should be re-initialized
"""
pass
111 changes: 109 additions & 2 deletions parsoda/model/social_data_app.py
Original file line number Diff line number Diff line change
@@ -208,21 +208,54 @@ def __init__(self, app_name: str, driver: ParsodaDriver, num_partitions=None, ch
def set_num_partitions(self, num_partitions: int):
"""
Sets the number of partitions. This is overriden by the chunk size if it is set.
Args:
num_partitions (int): The wanted partitions number per crawler
Returns:
SocialDataApp: this SocialDataApp instance
"""
self.__num_partitions = num_partitions
return self

def set_chunk_size(self, chunk_size: int):
"""
Sets the data chunk size in megabytes. This parameter overrides the number of partitions.
Args:
chunk_size (int): The wanted partition size
Returns:
SocialDataApp: this SocialDataApp instance
"""
self.__chunk_size = chunk_size
return self

def set_report_file(self, filename: str):
"""
Sets the file name of the ParSoDA report
Args:
filename (str): The file name where the report will be saved
Returns:
SocialDataApp: this SocialDataApp instance
"""
self.__report_file = filename
return self

def set_crawlers(self, crawlers: List[Crawler]):
"""Sets the list of crawlers to be used for loading data
Args:
crawlers (List[Crawler]): A list of Crawler objects
Raises:
Exception: if no crawler is given
Returns:
SocialDataApp: this SocialDataApp instance
"""
if crawlers is None or len(crawlers) == 0:
raise Exception("No crawler given")
self.__crawlers = []
@@ -231,6 +264,17 @@ def set_crawlers(self, crawlers: List[Crawler]):
return self

def set_filters(self, filters: List[Filter]):
"""Sets the filters to be applied to data
Args:
filters (List[Filter]): the list of filters
Raises:
Exception: if no filter is given
Returns:
SocialDataApp: this SocialDataApp instance
"""
if filters is None or len(filters) == 0:
raise Exception("No filter given")
self.__filters = []
@@ -239,38 +283,101 @@ def set_filters(self, filters: List[Filter]):
return self

def set_mapper(self, mapper: Mapper[K, V]):
"""Sets the mapper to be used in Map-Reduce step
Args:
mapper (Mapper[K, V]): the mapper
Raises:
Exception: if no mapper is given
Returns:
SocialDataApp: this SocialDataApp instance
"""
if mapper is None:
raise Exception("No mapper given")
self.__mapper = mapper
return self

def set_secondary_sort_key(self, key_function: Callable[[V], SORTABLE_KEY]):
"""Sets a key-function to be used for secondary sort step. If no key-function is set, the secondary sort will not be executed.
A key object returned by the specified key-function must be a sortable object, i.e. an object that can be compared to other objects of the same type.
Args:
key_function (Callable[[V], SORTABLE_KEY]): a callable object which maps an item to a key.
Raises:
Exception: if no key-function is given
Returns:
SocialDataApp: this SocialDataApp instance
"""
if key_function is None:
raise Exception("No key function given")
self.__secondary_sort_key_function = key_function
return self

def set_reducer(self, reducer: Reducer[K, V, R]):
"""Sets the reducer to be used in Map-Reduce step.
Args:
reducer (Reducer[K, V, R]): the reducer
Raises:
Exception: if no reducer is given
Returns:
SocialDataApp: this SocialDataApp instance
"""
if reducer is None:
raise Exception("No reducer given")
self.__reducer = reducer
return self

def set_analyzer(self, analyzer: Analyzer[K, R, A]):
"""Sets an optional analysis function.
This could be a function that creates a new SocialDataApp instance and could use the same driver given in the current one.
Args:
analyzer (Analyzer[K, R, A]): the analyzer
Raises:
Exception: if no analyzer is given
Returns:
SocialDataApp: this SocialDataApp instance
"""
if analyzer is None:
raise Exception("No analyzer given")
self.__analyzer = analyzer
return self

def set_visualizer(self, visualizer: Visualizer[A]):
"""Set an optional function for the visualization step (e.g., a function that writes results to a file)
Args:
visualizer (Visualizer[A]): the visualizer
Raises:
Exception: if no analyzer is given
Returns:
SocialDataApp: this SocialDataApp instance
"""
if visualizer is None:
raise Exception("No visualizer given")
self.__visualizer = visualizer
return self

def execute(self) -> ParsodaReport:
#locale.setlocale(locale.LC_ALL, "en_US.utf8")
"""Runs the application and returns a report about its execution.
Raises:
Exception: if some preliminary check fails (e.g. no crawlers are set) or some ParSoDA step fails during the execution.
Returns:
ParsodaReport: the execution report
"""
# Check application components
if self.__crawlers is None or len(self.__crawlers) == 0:
raise Exception("No crawler is set")
@@ -296,7 +403,7 @@ def execute(self) -> ParsodaReport:
reducer = self.__reducer
secondary_key = self.__secondary_sort_key_function

# Staart ParSoDA workflow, initialize driver
# Start ParSoDA workflow, initialize driver
print(f"[ParSoDA/{self.__app_name}] initializing driver: {type(self.__driver).__name__}")
driver.set_chunk_size(self.__chunk_size*1024*1024)
driver.set_num_partitions(self.__num_partitions)
167 changes: 164 additions & 3 deletions parsoda/model/social_data_item.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from datetime import datetime
import json
from typing import Optional, List, Dict
from typing import Optional, List, Dict, final

from parsoda.utils.json_serializer import obj_to_json, obj_from_json


@final
class ItemPostTime:
"""
Class used for representing the time of posts
"""
year: int = None
month: int = None
day: int = None
@@ -22,6 +26,11 @@ def __init__(self, other: datetime):
self.second = other.second

def to_datetime(self) -> datetime:
"""Convert the object to a standard datetime object
Returns:
datetime: the datetime object built
"""
return datetime(
self.year,
self.month,
@@ -41,17 +50,28 @@ def __eq__(self, other) -> bool:
self.second == other.second

def to_json(self) -> str:
"""Serialize the item to JSON.
Returns:
str: JSON representation of this item
"""
json_dict = {}
json_dict["timestamp"] = self.to_datetime().timestamp()
return json.dumps(json_dict)

@staticmethod
def from_json(json_str: str):
"""Deserialize a social data item from JSON.
Returns:
SocialDataItem: the deserialized data item
"""
timestamp = int(json.loads(json_str)['timestamp'])
dt = datetime.fromtimestamp(timestamp)
return ItemPostTime(dt)


@final
class ItemLocation:
latitude: float
longitude: float
@@ -66,7 +86,11 @@ def __eq__(self, other) -> bool:
self.longitude == other.longitude


@final
class SocialDataItem:
"""Class for defining ParSoDA-Py's standard representantion of social data items.
An instance of this class can be serialized as a JSON
"""

def __init__(self):
self.id = ''
@@ -81,27 +105,67 @@ def __init__(self):
self.original_format: str = '<unknown>'

def has_user_id(self):
"""Check if this item has a user ID.
Returns:
Boolean: true if the items has a user ID, false otherwise
"""
return self.user_id is not None and self.user_id != ''

def has_user_name(self):
"""Check if this item has a user name.
Returns:
Boolean: true if the items has a user name, false otherwise
"""
return self.user_name is not None and self.user_name != ''

def has_text(self):
"""Check if this item has text.
Returns:
Boolean: true if the items has text, false otherwise
"""
return self.text is not None and self.text != ''

def has_tags(self) -> bool:
"""Check if this item has tags.
Returns:
Boolean: true if the items has tags, false otherwise
"""
return len(self.tags) > 0

def has_extras(self) -> bool:
"""Check if this item has extra data.
Returns:
Boolean: true if the items has extra data, false otherwise.
"""
return len(self.extras) > 0

def has_date_posted(self) -> bool:
"""Check if this item has posting date.
Returns:
Boolean: true if the items has posting date, false otherwise.
"""
return self.date_posted is not None

def has_location(self) -> bool:
"""Check if this item has a location.
Returns:
Boolean: true if the items has a location, false otherwise
"""
return self.location is not None

def to_json(self) -> str:
"""Serialize the item to JSON.
Returns:
str: JSON representation of this item
"""
json_dict = {}
json_dict['id'] = self.id
json_dict['user_id'] = self.user_id
@@ -118,6 +182,11 @@ def to_json(self) -> str:

@staticmethod
def from_json(json_string: str):
"""Deserialize a social data item from JSON.
Returns:
SocialDataItem: the deserialized data item
"""
self = SocialDataItem()
json_dict = json.loads(json_string)

@@ -158,6 +227,8 @@ def __repr__(self) -> str:


class SocialDataItemBuilder:
"""Class used for building new social data items
"""

def __init__(self):
self.item = SocialDataItem()
@@ -173,57 +244,147 @@ def build(self) -> SocialDataItem:
return built

def set_id(self, id: str):
"""Sets the item ID
Args:
id (str): the item ID
Returns:
SocialDataItemBuilder: this builder
"""
self.item.id = str(id)
return self

def set_user_id(self, user_id: str):
"""Sets the user ID
Args:
id (str): the user ID
Returns:
SocialDataItemBuilder: this builder
"""
self.item.user_id = str(user_id)
return self

def set_user_name(self, user_name: str):
"""Sets the user name.
Args:
user_name (str): the user name.
Returns:
SocialDataItemBuilder: this builder
"""
self.item.user_name = str(user_name)
return self

def set_date_posted(self, date_posted: datetime):
"""Sets the posting date.
Args:
id (str): the posting date
Returns:
SocialDataItemBuilder: this builder
"""
self.item.date_posted = ItemPostTime(date_posted)
return self

def set_text(self, text: str):
"""Sets the item text.
Args:
id (str): the item text
Returns:
SocialDataItemBuilder: this builder
"""
self.item.text = str(text)
return self

def set_tags(self, tags: List):
"""Sets the list of tags
Args:
id (str): the list of tags
Returns:
SocialDataItemBuilder: this builder
"""
self.item.tags = tags
return self

def set_location(self, latitude: float, longitude: float):
"""Sets the item location.
Args:
id (str): the item location
Returns:
SocialDataItemBuilder: this builder
"""
self.item.location = ItemLocation(latitude, longitude)
return self

def put_extra(self, key, value):
"""Add extra data to the item
Args:
key (str): the data key
value: the value data
Returns:
SocialDataItemBuilder: this builder
"""
self.item.extras[key] = value
return self

def del_extra(self, key):
"""Delete an extra value.
Args:
key (str): the key of the data value to remove
Returns:
SocialDataItemBuilder: this builder
"""
del self.item.extras[key]
return self

def clear_extras(self):
"""Clears the extra data
Returns:
SocialDataItemBuilder: this builder
"""
self.item.extras.clear()
return self

def set_extras(self, extras: dict):
"""
Sets all extras at once from a standard dictionary.
Invoking this method all the extras previously set are deleted.
:param extras: dictionary
:return:
Args:
extras (str): the extra dictionary
Returns:
SocialDataItemBuilder: this builder
"""
self.item.extras = {}
for key in extras:
self.item.extras[key] = extras[key]
return self

def set_original_format(self, original_format: str):
"""Sets a name for the item original format
Args:
original_format (str): a name for the original format
Returns:
SocialDataItemBuilder: this builder
"""
self.item.original_format = original_format
return self
110 changes: 0 additions & 110 deletions parsoda/utils/generic_utils.py

This file was deleted.

0 comments on commit 8ba65a1

Please sign in to comment.