Resilient Distributed Datasets w Apache Spark – Część 1

Dwa poprzednie wpisy były w głównej mierze teoretyczne. Najwyższa pora na pierwsze kody na blogu! Na tapet bierzemy podstawowy typ danych platformy Spark – RDD. Dlaczego akurat taki temat? Poznanie tych kolekcji umożliwia postawienie pierwszych kroków w Sparku, a zrozumienie ich działania pozwala na wydajne przetwarzanie danych.

Do wykonania zadań użyjemy klastra w chmurze, oferowanego przez platformę Databricks w wersji Community. Ta wersja oferuje opcję z jednym węzłem obliczeniowym – to wystarczy do celów ćwiczeniowych.
Program będziemy pisać w PySparku, czyli swoistym API w Pythonie dla Sparka.


Rozpoczniemy od wczytania CSVki zawierajacej filmy i seriale ze znanego serwisu VOD:

netflix_titles = spark.read.csv(
    "dbfs:/FileStore/tables/netflix_titles.csv", header="false"
).rdd

Wywołanie spark.read.csv zwraca DataFrame. Zwykle to właśnie na tym typie operujemy dla danych tabelarycznych, ale do naszych celów szkoleniowych potrzebujemy kolekcji RDD. Z uzyskanego obiektu wyciągamy więc tylko rdd.

Zmienna netflix_titles przechowuje teraz obiekt typu MapPartitionsRDD. Można by się spodziewać, że w tej zmiennej znajdziemy wszystkie wiersze wczytanego arkusza – Nic bardziej mylnego! Jak wspomniałem we wcześniejszym wpisie Spark vs Hadoop, RDD są leniwe (Lazy Evaluated) i dopóki nie wywołamy akcji zwrócenia rezultatów (zobacz akcje materializujące RDD, a transformacje), byty te są jedynie instrukcją wykonania operacji.

Zobaczmy więc jak wygląda przykładowy wiersz w naszym zbiorze:

netflix_titles.take(1)

# [Row(show_id='s1', type='TV Show', title='3%', director=None, cast='João Miguel, Bianca Comparato, Michel Gomes, Rodolfo Valente, Vaneza Oliveira, Rafael Lozano, Viviane Porto, Mel Fronckowiak, Sergio Mamberti, Zezé Motta, Celso Frateschi', country='Brazil', date_added='August 14, 2020', release_year='2020', rating='TV-MA', duration='4 Seasons', listed_in='International TV Shows, TV Dramas, TV Sci-Fi & Fantasy')]

Warto mieć świadomość, że nasze niewinne take jest akcją – powoduje więc załadowanie wszystkich danych z pliku źródłowego. Nie jest to jednak naiwne wczytanie wszystkich danych do pamięci programu (drivera), a rozmieszczenie ich na partycjach (czym są partycje i jak odbywa się taki podział to temat na osobny wpis).

Liczymy dane

Załóżmy, że chcemy teraz policzyć ile filmów i ile seriali zawiera nasz zbiór.
Do realizacji tego zadania przekształcimy aktualne RDD do postaci Paired RDD. Jest to kolekcja z elementami klucz-wartość, czyli typ danych znany do tego stopnia, że znacząca ilość metod ze Spark API (np. groupByKey, reduceByKey i wiele innych) operuje właśnie na nim, a bez niego niemożliwe jest partycjonowanie.

type_pairs = netflix_titles.map(lambda x: (x[1], 1))
type_pairs.take(5) # Zakładamy, że nie ma więcej niż 5 typów

# [('TV Show', 1), ('Movie', 1), ('Movie', 1), ('Movie', 1), ('Movie', 1)]

Rezultat możemy osiągnąć na kilka sposobów. Jednym z pomysłów, który przychodzi na myśl, jest pogrupowanie po typie danej pozycji (po kluczu naszych par). Zobaczmy, jak mogłoby to wyglądać:

Liczymy – wersja z groupByKey

type_pairs.groupByKey().mapValues(sum).take(5)

#  [('TV Show', 2410), ('Movie', 5377), (None, 1), ('William Wyler', 1)]

Hura! Udało się nam dowiedzieć ile mamy poszczególnych pozycji. Cel został osiągnięty. Czy aby jednak na pewno powinniśmy na tym poprzestać?

Spójrzmy na wizualizację DAG – planu logicznego powyższego zapytania:

Wykonanie składa się z 2 etapów (stage’ów). Pojedynczy etap jest zbiorem operacji, do którego wykonania nie jest konieczne wykonanie shufflingu. Pokrótce shuffling jest operacją polegającą na wymianie danych pomiędzy partycjami. Partycje mogą być rozmieszczone na różnych maszynach, stąd operacja wiąże się z koniecznością przesyłania danych przez sieć. Taki transfer jest stosunkowo wolny (w porównaniu np. do zapisów i odczytów w pamięci lub na dysk twardy), dlatego właśnie shufflingu chcemy unikać, a przynajmniej dążyć do jego minimalizacji.

  • Etap 1 (na rys. Stage 11) – Następuje wczytanie danych z CSVki, wykonywane jest też jawne mapowanie DataFrame’a na RDD, przekształcenie do postaci krotek i podział danych na partycje. Warto tutaj omówić, kiedy uruchamiane są poszczególne zadania:
    – Scan csv – Reprezentuje naturalnie wczytanie danych z pliku
    – Map – Wywoływane jest w przypadku zastosowania transformacji względem pojedynczego obiektu (argumentem funkcji przekształcenia jest pojedynczy element kolekcji)
    – MapPartitions – Wywoływane kiedy transformacja dotyczy wszystkich elementów z partycji
    (argumentem funkcji przekształcenia jest kolekcja elementów wchodząca w skład jednej partycji)
  • Etap 2 – Jest konsekwencją wywołania groupByKey. Wszystkie dane są repartycjonowane:
    PartitionBy – Powoduje rozdzielenie danych na partycje na podstawie podanego klucza.

Poznaliśmy sposób realizacji z groupByKey. Wciąż nie mamy jednak punktu odniesienia, na podstawie którego moglibyśmy stwierdzić czy rozwiązanie jest wydajne. Przyjrzyjmy się zatem kolejnej metodzie.

Liczymy – wersja z reduceByKey

type_pairs.reduceByKey(lambda a,b: a + b).take(5)

# [('TV Show', 2410), ('Movie', 5377), (None, 1), ('William Wyler', 1)]

Wynik jest zatem dokładnie taki sam. Spójrzmy na wizualizację DAG:

Logiczny plan wykonania prezentuje się dokładnie tak samo, jak w przypadku groupByKey. Spójrzmy jednak na ilość wymienionych danych w obu przypadkach:

Wykonanie z groupByKey
Wykonanie z reduceByKey

Obserwując jedynie czas przetwarzania poszczególnych etapów, można by odnieść wrażenie, że to wersja z groupByKey poradziła sobie lepiej. Pamiętajmy jednak, że w ramach naszych ćwiczeń operujemy na jednej maszynie obliczeniowej, a nasz zbiór danych jest relatywnie mały. Czas przetwarzania w naszym przypadku nie uwzględnia więc wymiany danych w sieci, która miałaby miejsce w tym przypadku. A wtedy kluczowy byłby rozmiar takiej wymiany. Tutaj różnica jest już dwukrotna i wypada na korzyść reduceByKey.

Dlaczego tak się dzieje? O tym w kolejnym odcinku serii o RDD. Zapraszam 🙂

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *