"Notebook'um Çok Yavaş!" — Fabric Spark Optimizasyonu
Microsoft Fabric forumlarında en sık paylaşılan şikayet: _"Notebook çalışmam 45 dakika sürüyor ama aynı veriyi SQL'de 2 dakikada işliyorum."_ Bunun nedeni genellikle yanlış Spark yapılandırması ve optimize edilmemiş PySpark kodudur.
Bu rehberde Fabric'te Spark ile verimli çalışmanın kurallarını paylaşıyoruz.
Notebook vs Spark Job Definition
| Özellik | Notebook | Spark Job Definition |
|---|---|---|
| Arayüz | İnteraktif, hücre bazlı | Kod dosyası (.py) |
| Kullanım | Geliştirme, analiz, test | Zamanlı üretim işleri |
| Session başlatma | İlk çalıştırmada (~30sn) | Her çalıştırmada |
| Pipeline entegrasyonu | ✅ | ✅ |
| Parametreli çalışma | ✅ (parametrized cells) | ✅ (argümanlar) |
| Önerilen | Geliştirme aşaması | Üretim pipeline'ları |
PySpark Optimizasyon Kuralları
Kural 1: Gereksiz Collect/toPandas Kullanmayın
class="code-comment"># YANLIŞ — tüm veriyi driver'a çeker
df = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis")
pandas_df = df.toPandas() class="code-comment"># 100M satır → bellek hatası
sonuc = pandas_df.groupby(class="code-string">"sehir").sum()
class="code-comment"># DOĞRU — Spark distributed olarak işler
df = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis")
sonuc = df.groupBy(class="code-string">"sehir").agg(F.sum(class="code-string">"tutar").alias(class="code-string">"toplam"))
sonuc.write.format(class="code-string">"delta").mode(class="code-string">"overwrite").save(class="code-string">"Tables/satis_ozet")collect(), toPandas() ve show() büyük veri setlerinde asla kullanılmamalıdır. Tüm veriyi tek makineye çeker ve bellek hatasına yol açar.Kural 2: Partition Pruning Kullanın
class="code-comment"># YANLIŞ — tüm tabloyu tarar
df = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis")
filtreli = df.filter(F.col(class="code-string">"yil") == class="code-number">2024)
class="code-comment"># DOĞRU — partitioned tablo oluşturun ve partition pruning'den faydalanın
class="code-comment"># Tablo oluştururken:
df.write.format(class="code-string">"delta").partitionBy(class="code-string">"yil", class="code-string">"ay") \
.mode(class="code-string">"overwrite").save(class="code-string">"Tables/fact_satis_partitioned")
class="code-comment"># Sorgularken Spark sadece ilgili partition'ı okur:
df = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis_partitioned")
filtreli = df.filter((F.col(class="code-string">"yil") == class="code-number">2024) & (F.col(class="code-string">"ay") == class="code-number">6))
class="code-comment"># Sadece yil=class="code-number">2024/ay=class="code-number">6 partition'ı okunur, diğerleri atlanırKural 3: Broadcast Join Kullanın
from pyspark.sql.functions import broadcast
class="code-comment"># Küçük dimension tablosu ile büyük fact tablosunu join'lerken
dim_urun = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/dim_urun") class="code-comment"># 10K satır
fact_satis = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis") class="code-comment"># 100M satır
class="code-comment"># YANLIŞ — normal join, shuffle gerektirir
sonuc = fact_satis.join(dim_urun, class="code-string">"urun_id")
class="code-comment"># DOĞRU — broadcast join, küçük tabloyu tüm node'lara dağıtır
sonuc = fact_satis.join(broadcast(dim_urun), class="code-string">"urun_id")
class="code-comment"># Shuffle yok → çok daha hızlıKural 4: Cache Stratejik Kullanın
class="code-comment"># Aynı DataFrame birden fazla kez kullanılacaksa cache'leyin
df_temiz = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis") \
.filter(F.col(class="code-string">"tutar") > class="code-number">0) \
.withColumn(class="code-string">"kdv", F.col(class="code-string">"tutar") * class="code-number">0.20)
class="code-comment"># Birden fazla işlemde kullanılacak
df_temiz.cache()
class="code-comment"># İşlem class="code-number">1: Şehir bazlı özet
sehir_ozet = df_temiz.groupBy(class="code-string">"sehir").agg(F.sum(class="code-string">"tutar"))
sehir_ozet.write.format(class="code-string">"delta").mode(class="code-string">"overwrite").save(class="code-string">"Tables/sehir_ozet")
class="code-comment"># İşlem class="code-number">2: Ürün bazlı özet
urun_ozet = df_temiz.groupBy(class="code-string">"urun_id").agg(F.sum(class="code-string">"tutar"))
urun_ozet.write.format(class="code-string">"delta").mode(class="code-string">"overwrite").save(class="code-string">"Tables/urun_ozet")
class="code-comment"># İşlem bittikten sonra cache'i temizleyin
df_temiz.unpersist()V-Order ve Optimize
Fabric'te Delta tablolarının performansını artırmak için:
class="code-comment"># V-Order ile yazma (Fabric'e özel optimizasyon)
df.write.format(class="code-string">"delta") \
.option(class="code-string">"vorder", class="code-string">"true") \
.mode(class="code-string">"overwrite") \
.save(class="code-string">"Tables/fact_satis_optimized")
class="code-comment"># Mevcut tabloyu optimize etme
spark.sql(class="code-string">"OPTIMIZE fact_satis_optimized")
class="code-comment"># Eski dosyaları temizleme
spark.sql(class="code-string">"VACUUM fact_satis_optimized RETAIN class="code-number">168 HOURS")Spark Session Yapılandırması
class="code-comment"># Notebook'ta Spark yapılandırması
class="code-comment"># Hücre class="code-number">1'de (en başta) çalıştırın
class="code-comment"># Executor bellek ve core ayarı
spark.conf.set(class="code-string">"spark.executor.memory", class="code-string">"8g")
spark.conf.set(class="code-string">"spark.executor.cores", class="code-string">"class="code-number">4")
spark.conf.set(class="code-string">"spark.sql.shuffle.partitions", class="code-string">"class="code-number">200")
class="code-comment"># Adaptive Query Execution (önerilir)
spark.conf.set(class="code-string">"spark.sql.adaptive.enabled", class="code-string">"true")
spark.conf.set(class="code-string">"spark.sql.adaptive.coalescePartitions.enabled", class="code-string">"true")Forumlarda En Çok Sorulan Spark Soruları
"Session başlatma neden 30 saniye sürüyor?"
Fabric'te Spark pool cold start süresi vardır. High Concurrency mode kullanarak session paylaşımı yapabilirsiniz.
"Notebook'um pipeline'dan çalışınca hata veriyor"
Notebook'ta interaktif widget'lar (display, plotly) pipeline modunda çalışmaz. Pipeline'dan çağrılacak notebook'larda sadece veri işleme kodu olmalı.
"Delta tablosu boyutu neden sürekli artıyor?"
Delta Lake her yazma işleminde yeni Parquet dosyaları oluşturur, eskileri silmez (time travel için). VACUUM komutuyla eski dosyaları düzenli temizleyin.
Sonuç
Fabric'te Spark ile verimli çalışmak için: collect()/toPandas() kullanmayın, partition pruning'den faydalanın, küçük tablolarda broadcast join kullanın ve V-Order ile yazın. Geliştirme için Notebook, üretim için Spark Job Definition tercih edin. Bu kurallarla 45 dakikalık Notebook'u 3 dakikaya düşürebilirsiniz.
