創建完 ETL 的 Spark Job 後我們要加入資料處理的內容,在預設的程式碼中只能做到資料搬遷,那這次我們的目標是要找出每個 user 最常購買的前五名商品,這部分會使用 PySpark 的進行
接下來會以修改後的程式進行說明,以下是每個 user 購買數量前五名商品的 PySpark 程式碼
- 為 Glue Job 初始化需要使用的 Library,此部分在預設的程式碼中會自動產生
- DynamicFrame 要將 Spark 的 DataFrame 的資料格式轉為 Glue 的 DynamicFrame
- pyspark.sql.functions 與 pyspark.sql.window 則是之後再使用 pyspark SQL 時會使用到的 Library
- Glue Job 初始化
- 第一個資料源的程式碼會由 Glue 自動產生,會對應到 Data Catalog 中的 DB 與 Table
- 這邊我們在多加兩個資料源因為我們需要 order_products_prior、order 與 products 這三張表一起 Join 的資料,程式部分可以複製第一個資料源的程式碼,並修改後面的 database、table_name、transformation_ctx 參數
- Join 的 Function 可以從右上角的 Transform 直接匯入,Transform 有提供常見的 Function 可以使用,除了 Join 還有 Filter、SplitFields、DropNullFields 等等的 Function 可以使用
- 需要注意的是 Transform 的 Function 要在 DynamicFrame 的格式下使用,如果已經像 join_products 一樣,已經透過最後面的 .toDF() 將 DynamicFrame 轉為 DataFrame,這個狀況下就無法再接著使用 Transform 的 Function
- Join 的 Function 中 frame1、frame2 代表著要 Join 的兩張 Table,key1 代表 frame1 要用來 Join 的欄位,key2 也是同理
請持續鎖定 Nextlink 架構師專欄,以獲得最新專業資訊喔!
若您有任何 AWS 需求,歡迎與我們聯繫!