Bu Case çalışmasında iki ayrı görev için iki ayrı Job yazılmıştır. Kodlar Scala ile geliştirilmiş ve SQL kullanılmamıştır. Spark, bir master iki worker olacak şekilde docker ile deploy edilmiştir.
orders
|-- customer_id: string
|-- location: string
|-- seller_id: string
|-- order_date: string
|-- order_id: string
|-- price: double
|-- product_id: string
|-- status: string
products
|-- brandname: string
|-- categoryname: string
|-- productid: string
|-- productname: string
Her ürün için aşağıdakileri hesaplayın ve dosya sistemine sonuçları yazın:
- net satış adedi ve tutarı (toplam satış adetleri - iptal / iade adetleri)
- brüt satış adedi ve tutarı (toplam satış adetleri)
- satıldığı son 5 gündeki satış sayısı ortalamaları
- en çok sattığı yer
Ürün bazlı fiyat değişimlerini tarihleriyle birlikte çıkarıp dosya sistemine yazın.
Burada orders
verisi üzerinde aynı product_id
'nin sonraki order'da fiyatı arttıysa rise
, azaldıysa fall
gibi bir çıktı üretebilirsiniz.
Bu bölümde, Docker imajlarını otomatik olarak başlatarak iki görevin de tamamlanmasını ve çıktıların dosya sistemine yazılmasını sağlayacağız.
Gereklilikler
-
Bilgisayarınızda Docker'ın kurulu olduğundan emin olun.
-
Bilgisayarınızda 8080 ve 7077 portlarının boşta ve kullanılabilir olduğundan emin olun.
İşlemlerin başlayabilmesi için start.sh dosyasını çalıştırmanız gerekiyor. Bu dosyadaki komutlar Linux komutları içerdiği için, Debian(Ubuntu) tabanlı bir işletim sistemini tercih etmeniz önerilir. Eğer Windows kullanıyorsanız, lütfen dosya içindeki komutları tek tek elle çalıştırın.
# işlemlerin başlaması
sudo bash ./start.sh
Bu dosyanın çalıştırılmasıyla birlikte, sırasıyla şu adımlar gerçekleştirilecek:
1. Eğer mevcut bir imaj çalışıyorsa, bu imaj durdurulacak.
2. out-data klasörü temizlenecek, yani içindeki dosyalar silinecek.
3. spark-network adında bir ağ oluşturulacak.
4. İmajlar yeniden oluşturulacak (build) ve ayağa kaldırılacak.
konteynerların ayağa kalkması
Ardından, Spark master ve iki worker ayağa kaldırılacak. Spark işlemleri tamamlandıktan sonra, spark-submit adlı konteyner, joblarımızın jar dosyalarını çalıştıracak ve jobların başarıyla tamamlanmasını bekleyecektir. Spark UI'ı görüntülemek için tarayıcınızı açın ve 0.0.0.0:8080 adresine gidin.
Spark kaynak yönetimi
-
Spark Master CPU:8, RAM: 16
-
Spark Worker-1 CPU:8, RAM: 8
-
SPARK Worker-2 CPU:8, RAM: 8
Spark UI 0.0.0.0:8080
Joblar başarıyla tamamlandıktan sonra, worker1 ve worker2 üzerinden elde edilen çıktılar alınacak ve bu çıktılar out_data klasörüne taşınacaktır.
Tüm bu işlemler, genellikle 2-3 dakika içerisinde tamamlanır ve çıktı dosyalarımız kullanılabilir hale gelir.
Çıktılar
Çıktı isimleri değişiklik gösterebilir
Çözümde Spark'ın 3.5.0 sürümü, Java'nın 8 sürümü ve Scala'nın 2.12.8 sürümü kullanılmıştır. Spark kütüphanesini bağımlılıklar için çözüm sürecinde Maven kullanılmıştır.
Job1
Spark, myPackage.Job1 sınıfını çalıştırır. Bu sınıf içinde, iki dosya okunduktan sonra Transactions sınıfında dosyalar birleştirilir, işlemler tamamlanır ve elde edilen yeni çıktı dosya sistemine yazılır.
İşlemler ve metodları
-
dosyaları joinleme : Transactions.joinDataFrames
-
net satış adedi ve tutarı (toplam satış adetleri - iptal / iade adetleri) : Transactions.calculateNetSales
-
brüt satış adedi ve tutarı (toplam satış adetleri) : Transactions.calculateGrossSales
-
satıldığı son 5 gündeki satış sayısı ortalamaları : Transactions.calculateAverageSalesPerProduct
-
en çok sattığı yer : Transactions.calculateMostSoldLocationForAllProducts
-
verilerin joinlenmesi : Transactions.joinFinalData
-
verilerin dosya sistemine yazılması : Transactions.writeDataFrame
Job2
Spark, myPackage.Job2 sınıfını çalıştırır. Bu sınıf içinde, iki dosya okunduktan sonra Transactions sınıfında dosyalar birleştirilir, işlemler tamamlanır ve elde edilen iki yeni çıktı dosya sistemine yazılır.
-
dosyaları joinleme : Transactions.joinDataFrames
-
Joinlenmiş datayı dosya sistemine yazma : Transactions.writeJoinData
-
Ürün bazlı fiyat değişimleri tarihleriyle birlikte : Transactions.calculatePriceChanges
-
oluşan çıktıyı dosya sistemine yazma : Transactions.writeDataFrame
-
Port Çakışması 8080 ve 7077 portlarının boş olduğundan emin olun.
-
Docker'ın kurulu olduğundan emin olunuz
-
Docker sürümü bazı sürümlerde docker-compose up kullanılırken bazı sürümlerde ise docker compose up kullanılır. Hata alırsanız bu iki komuttan birini kullanarak çözünüz.