"Notebook'um Çok Yavaş!" — Fabric Spark Optimizasyonu

Microsoft Fabric forumlarında en sık paylaşılan şikayet: _"Notebook çalışmam 45 dakika sürüyor ama aynı veriyi SQL'de 2 dakikada işliyorum."_ Bunun nedeni genellikle yanlış Spark yapılandırması ve optimize edilmemiş PySpark kodudur.

Bu rehberde Fabric'te Spark ile verimli çalışmanın kurallarını paylaşıyoruz.

Notebook vs Spark Job Definition

| Özellik | Notebook | Spark Job Definition |

|---|---|---|

| Arayüz | İnteraktif, hücre bazlı | Kod dosyası (.py) |

| Kullanım | Geliştirme, analiz, test | Zamanlı üretim işleri |

| Session başlatma | İlk çalıştırmada (~30sn) | Her çalıştırmada |

| Pipeline entegrasyonu | ✅ | ✅ |

| Parametreli çalışma | ✅ (parametrized cells) | ✅ (argümanlar) |

| Önerilen | Geliştirme aşaması | Üretim pipeline'ları |

PySpark Optimizasyon Kuralları

Kural 1: Gereksiz Collect/toPandas Kullanmayın

Python
class="code-comment"># YANLIŞ — tüm veriyi driver'a çeker
df = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis")
pandas_df = df.toPandas()  class="code-comment"># 100M satır → bellek hatası
sonuc = pandas_df.groupby(class="code-string">"sehir").sum()

class="code-comment"># DOĞRU — Spark distributed olarak işler
df = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis")
sonuc = df.groupBy(class="code-string">"sehir").agg(F.sum(class="code-string">"tutar").alias(class="code-string">"toplam"))
sonuc.write.format(class="code-string">"delta").mode(class="code-string">"overwrite").save(class="code-string">"Tables/satis_ozet")
⚠️collect(), toPandas() ve show() büyük veri setlerinde asla kullanılmamalıdır. Tüm veriyi tek makineye çeker ve bellek hatasına yol açar.

Kural 2: Partition Pruning Kullanın

Python
class="code-comment"># YANLIŞ — tüm tabloyu tarar
df = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis")
filtreli = df.filter(F.col(class="code-string">"yil") == class="code-number">2024)

class="code-comment"># DOĞRU — partitioned tablo oluşturun ve partition pruning'den faydalanın
class="code-comment"># Tablo oluştururken:
df.write.format(class="code-string">"delta").partitionBy(class="code-string">"yil", class="code-string">"ay") \
    .mode(class="code-string">"overwrite").save(class="code-string">"Tables/fact_satis_partitioned")

class="code-comment"># Sorgularken Spark sadece ilgili partition'ı okur:
df = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis_partitioned")
filtreli = df.filter((F.col(class="code-string">"yil") == class="code-number">2024) & (F.col(class="code-string">"ay") == class="code-number">6))
class="code-comment"># Sadece yil=class="code-number">2024/ay=class="code-number">6 partition'ı okunur, diğerleri atlanır

Kural 3: Broadcast Join Kullanın

Python
from pyspark.sql.functions import broadcast

class="code-comment"># Küçük dimension tablosu ile büyük fact tablosunu join'lerken
dim_urun = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/dim_urun")  class="code-comment"># 10K satır
fact_satis = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis")  class="code-comment"># 100M satır

class="code-comment"># YANLIŞ — normal join, shuffle gerektirir
sonuc = fact_satis.join(dim_urun, class="code-string">"urun_id")

class="code-comment"># DOĞRU — broadcast join, küçük tabloyu tüm node'lara dağıtır
sonuc = fact_satis.join(broadcast(dim_urun), class="code-string">"urun_id")
class="code-comment"># Shuffle yok → çok daha hızlı

Kural 4: Cache Stratejik Kullanın

Python
class="code-comment"># Aynı DataFrame birden fazla kez kullanılacaksa cache'leyin
df_temiz = spark.read.format(class="code-string">"delta").load(class="code-string">"Tables/fact_satis") \
    .filter(F.col(class="code-string">"tutar") > class="code-number">0) \
    .withColumn(class="code-string">"kdv", F.col(class="code-string">"tutar") * class="code-number">0.20)

class="code-comment"># Birden fazla işlemde kullanılacak
df_temiz.cache()

class="code-comment"># İşlem class="code-number">1: Şehir bazlı özet
sehir_ozet = df_temiz.groupBy(class="code-string">"sehir").agg(F.sum(class="code-string">"tutar"))
sehir_ozet.write.format(class="code-string">"delta").mode(class="code-string">"overwrite").save(class="code-string">"Tables/sehir_ozet")

class="code-comment"># İşlem class="code-number">2: Ürün bazlı özet
urun_ozet = df_temiz.groupBy(class="code-string">"urun_id").agg(F.sum(class="code-string">"tutar"))
urun_ozet.write.format(class="code-string">"delta").mode(class="code-string">"overwrite").save(class="code-string">"Tables/urun_ozet")

class="code-comment"># İşlem bittikten sonra cache'i temizleyin
df_temiz.unpersist()

V-Order ve Optimize

Fabric'te Delta tablolarının performansını artırmak için:

Python
class="code-comment"># V-Order ile yazma (Fabric'e özel optimizasyon)
df.write.format(class="code-string">"delta") \
    .option(class="code-string">"vorder", class="code-string">"true") \
    .mode(class="code-string">"overwrite") \
    .save(class="code-string">"Tables/fact_satis_optimized")

class="code-comment"># Mevcut tabloyu optimize etme
spark.sql(class="code-string">"OPTIMIZE fact_satis_optimized")

class="code-comment"># Eski dosyaları temizleme
spark.sql(class="code-string">"VACUUM fact_satis_optimized RETAIN class="code-number">168 HOURS")
Fabric Spark İşlem Pipeline
KaynaklarERP, CRM, APIStagingHam VeriVeri AmbarıStar SchemaRaporlamaPower BIUçtan Uca Veri Pipeline Mimarisi

Spark Session Yapılandırması

Python
class="code-comment"># Notebook'ta Spark yapılandırması
class="code-comment"># Hücre class="code-number">1'de (en başta) çalıştırın

class="code-comment"># Executor bellek ve core ayarı
spark.conf.set(class="code-string">"spark.executor.memory", class="code-string">"8g")
spark.conf.set(class="code-string">"spark.executor.cores", class="code-string">"class="code-number">4")
spark.conf.set(class="code-string">"spark.sql.shuffle.partitions", class="code-string">"class="code-number">200")

class="code-comment"># Adaptive Query Execution (önerilir)
spark.conf.set(class="code-string">"spark.sql.adaptive.enabled", class="code-string">"true")
spark.conf.set(class="code-string">"spark.sql.adaptive.coalescePartitions.enabled", class="code-string">"true")

Forumlarda En Çok Sorulan Spark Soruları

"Session başlatma neden 30 saniye sürüyor?"

Fabric'te Spark pool cold start süresi vardır. High Concurrency mode kullanarak session paylaşımı yapabilirsiniz.

"Notebook'um pipeline'dan çalışınca hata veriyor"

Notebook'ta interaktif widget'lar (display, plotly) pipeline modunda çalışmaz. Pipeline'dan çağrılacak notebook'larda sadece veri işleme kodu olmalı.

"Delta tablosu boyutu neden sürekli artıyor?"

Delta Lake her yazma işleminde yeni Parquet dosyaları oluşturur, eskileri silmez (time travel için). VACUUM komutuyla eski dosyaları düzenli temizleyin.

Sonuç

Fabric'te Spark ile verimli çalışmak için: collect()/toPandas() kullanmayın, partition pruning'den faydalanın, küçük tablolarda broadcast join kullanın ve V-Order ile yazın. Geliştirme için Notebook, üretim için Spark Job Definition tercih edin. Bu kurallarla 45 dakikalık Notebook'u 3 dakikaya düşürebilirsiniz.