Resilient Distributed Datasets (RDD) w Apache Spark – Część 2

Kolejna, podstawowa dawka wiedzy o kolekcjach RDD. Wpis jest kontynuacją Resilient Distributed Datasets w Apache Spark – Część 1.


W poprzedniej części artykułu pokazaliśmy wyniki wywołania groupByKey i reduceByKey. Spróbujmy zobrazować różnicę w działaniu tych dwóch, podstawowych funkcji:

Legenda do obrazowanych groupByKey i reduceByKey
Liczenie z użyciem groupByKey

Zauważmy, że groupByKey nie wykonuje żadnych operacji na pierwszym poziomie. Wszystkie rekordy wejściowe tej operacji są zatem przesyłane pomiędzy partycjami. W przypadku dużej liczby rekordów ilość takich wymian byłaby bardzo duża.

Spójrzmy na wykonanie z reduceByKey:

Liczenie z użyciem reduceByKey

Istotna jest w tym przypadku operacja, która wykonała się już na pierwszym poziomie. Zamiast wysyłania wszystkich krotek pomiędzy partycjami, reduceByKey sprytnie wykonuje zliczanie dostępnych na danej partycji kluczy jeszcze przed ich wysyłką. Powoduje to zmniejszenie rozmiaru wysyłanych przez sieć danych, a w konsekwencji szybsze wykonanie zadania.

Liczymy – wersja z countByKey

type_pairs.countByKey()

# defaultdict(int, {'TV Show': 2410, 'Movie': 5377, None: 1, 'William Wyler': 1})

Wywołanie wygląda prosto. CountByKey jest akcją, więc nie musimy dokładać take, jak robiliśmy to w dwóch poprzednich przykładach. Spójrzmy, jak prezentuje się DAG:

Tutaj też przebieg jest wyjątkowo prosty. Wygląda tak, jakbyśmy wycięli drugi etap od wykonania groupByKey lub reduceByKey. Unikamy shufflingu i minimalizujemy ilość przesyłanych danych. Czy zatem znaleźliśmy najlepsze rozwiązanie naszego zadania? – Jak pewnie się domyślasz, byłoby zbyt pięknie.
CountByKey w odróżnieniu od dwóch poprzednio omawianych funkcji wykonuje całą operację redukcji danych na driverze. Zachodzi tutaj więc scalanie danych z wszystkich partycji i obliczenie ich w jednym, głównym procesie Sparka. Jeżeli w tym przypadku działamy z dużym zbiorem danych, to takie łączenie w jednym miejscu będzie niewydajne.

Podsumowując, kiedy rozmiar danych jest duży, najrozsądniejszym wyborem dla naszego zliczania byłoby zastosowanie reduceByKey. Użycie groupByKey skutkuje wysoką ilością shufflingu, a dla kontrastu countByKey wykonuje zliczanie wszystkich danych w driverze Sparka.

Persystencja RDD

Czasem zachodzi potrzeba wielokrotnego wykonania tej samej części RDD, w różnych miejscach programu. Jeżeli podejdziemy do tego w tradycyjny sposób, za każdym razem wspólna część wywołania będzie obliczana ponownie. Spójrzmy na poniższy przykład:

type_pairs = netflix_titles.map(lambda x: (x[1], 1))
count_all_pairs = type_pairs.count()
count_types = type_pairs.reduceByKey(lambda a,b: a + b).take(5)

Skorzystaliśmy dwukrotnie z RDD type_pairs. Jak pamiętasz, RDD nie są ewaluowane aż do momentu wywołania na nich akcji. Oznacza to, że w tym przypadku ich materializacja zachodzi dwukrotnie, a przecież pomiędzy tymi dwoma wywołaniami RDD nie uległo zmianie. Moglibyśmy więc po pierwszym wywołaniu zapamiętać jego stan i użyć go do kolejnych. Dokładnie z taką pomocą przychodzą nam funkcje cache i persist.

type_pairs = netflix_titles.map(lambda x: (x[1], 1)).cache()
count_all_pairs = type_pairs.count()
count_types = type_pairs.reduceByKey(lambda a,b: a + b).take(5)

Różnicę widać wyłącznie w pierwszej linijce i dotyczy bezpośrednio naszej kolekcji RDD. Przechowujemy w ten sposób w pamięci zmaterializowaną postać RDD i korzystamy z niej dwukrotnie. Unikamy tym samym ponownego, zbędnego obliczenia, jak miało to miejsce w poprzednim przykładzie.

DAG bez cache’a (po lewej) i z cache (po prawej)

Jak widać na samym DAG, zjawisko persystencji sygnalizowane jest krokiem w postaci zielonej diody – oznaczenie małe, ale diametralnie zmienia wywołanie programu 🙂

Uważny czytelnik może zapytać: „No dobrze, omówiliśmy cache, ale co z persist, o którym wcześniej była mowa?”. Jaka jest więc pomiędzy nimi różnica? Otóż samo cache() jest jedynie cukrem syntaktycznym dla wywołania persist(StorageLevel.MEMORY_ONLY). Funkcja persist daje nam zatem większą kontrolę nad tym, gdzie RDD zostanie zapisane. Zobaczmy jakie podstawowe poziomy persystencji możemy uzyskać:

  • DISK_ONLY – Deserializowane obiekty RDD, przechowywane wyłącznie na dysku
  • MEMORY_AND_DISK – Deserializowane obiekty RDD; Jeżeli obiekty nie mieszczą się w pamięci, wówczas partycje które przekraczają rozmiar będą przechowywane na dysku i w razie konieczności stamtąd wczytywane
  • MEMORY_AND_DISK_SER – Serializowane obiekty RDD, przechowywane w pamięci a ich nadmiar przenoszony na dysk, analogicznie do zachowania MEMORY_AND_DISK
  • MEMORY_ONLY – Deserializowane obiekty RDD, przechowywane wyłącznie pamięci
  • MEMORY_ONLY_SER – Serializowane obiekty RDD, przechowywane wyłącznie w pamięci
  • OFF_HEAP – Serializowane obiekty RDD, w przeciwieństwie do poprzednich punktów obiekty są tutaj alokowane poza JVM, a sterowanie ich czasem życia odbywa się przez aplikację (a nie Garbage Collector). Podejście to redukuje częste użycie GC, ale przenosi też odpowiedzialność zarządzania pamięcią na aplikację.


Omówiliśmy różne metody przetwarzania danych bezpośrednio na kolekcjach RDD. Dowiedzieliśmy się także, że istnieje wiele metod osiągnięcia tego samego rezultatu, operując na tych samych strukturach danych. Na praktycznych przykładach zobaczyliśmy, jak duże znaczenie ma wybór odpowiednich funkcji i rozwiązań, kiedy w grę wchodzi przetwarzanie w środowisku rozproszonym. Warto zwrócić uwagę, że w obu wpisach operowaliśmy na stosunkowo niewielkim zbiorze danych – Im rozmiar byłby jednak większy, tym różnice byłyby bardziej widoczne. Analiza logicznego planu przetwarzania zapytań w postaci DAG ułatwia optymalizację przetwarzania oraz umożliwia przetestowanie rozwiązania na mniejszym wycinku danych, zanim zostanie ono zaaplikowane na większą skalę.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *