שלום לכולם,
אחרי הפסקה קצת ארוכה בגלל אוגוסט – חופש ילדים – אתם יודעים איך זה.
היום נחזור לדבר על קוד ריאקטיבי, והפעם נתחיל ללמוד איך קוראים את התיעוד של Reactor
ובמיוחד את דיאגרמת מארבל.
ואחרי זה נמשיך לכמה דוגמאות קוד קצרות אשר יתנו לנו הבנה איך הספריה הזאת עובדת.
אז בואו נתחיל
אז איך קוראים את הדיאגרמת מארבל?
כאשר הצגתי בפוסט הקדום בסדרה את ספריית Reactor
דיברנו על שתי אבני בניין עיקריות: Flux
ואת Mono
.
עבור שתיהן הצגנו דוגמה של ה-marble diagram
, אשר מסביר את ה-flow של כל אחד מהם.
הסיבה שחשוב להבין איך קוראים את הגרפים הללו היא שהם הבסיס לכל התיעוד של Reactor
, וגם הם מופיעים ב-javadoc.
ולכן בתחילת הפוסט הזה אנחנו נתעמק ונבין איך אנחנו קוראים את הגרפים הללו.
דיאגרמת מארבל של Flux Operators
חלק מהאופרטורים אנחנו מבצעים על מופע של Flux
או של Mono
.
הם נקראים Flux operators
לטעם הנוחות, אבל חשוב להבין שזה תופס עבור שתי אבני הבניין.
Flux<T> output = source.fluxOperator()
הגרף שלהם נראה ככה:
בואו נפרק את ה-marble diagram קצת:
בשורה העליונה יש לנו את ה-Flux
המקור. כלומר מי שמכניס מידע לאופרטור.
כל עיגול על השורה הזאת זה אלמנט אשר עובר בו. כלומר זהו איוונט בשם onNext
.
כל אלמנט עובר דרך האופרטור הזה.
השורה התחתונה היא הפלט של ה-Flux
, אנחנו יכולים לראות שבמקרה הזה האלמנטים השתנו, אך זה לא חייב להיות ככה (יש אופרטורים שלא משנים).
נשים לב שבסוף השורה הראשונה יש לנו קו שחור עבה. זה הסימן שהגענו ל-onComplete()
. הוא בשורה העליונה כי זה סימן שנכנס לתוך האופרטור על מנת לסמן לו שהוא סיים.
ובשורה התחתונה יש לנו את ה-X האדום, שמסמן שגיאה. זה בשורה התחתונה כי זה מה שהאופרטור מוציא אם יש לו שגיאה.
דיאגרמת מארבל של Static operators
האופרטורים הללו הם סטאטיים, כלומר הם לא פועלים על מופע ספציפי של Flux
או של Mono
.
Flux<T> output = Flux.merge(sourceFlux1, sourcePublisher2);
גם כאן, יש לנו שני צירים – העליון של הקלט והתחתון של הפלט.
אם יש לנו קו דק בסוף החלק העליון זה מסמן את הסוף של הקלט.
התנהגויות שונות עבור אלמנטים שונים
יש אופרטורים ב-Reactor
שלא מתנהגים אותו הדבר עבור כל האלמנטים.
למשל החצי השמאלי בדיאגרמה הזאת פולט רק את הכחול, ולא את הצהוב והירוק.
החץ הכחול שעולה כלפי מעלה הם סיגננלים אשר עוברים מהאופרטור למי שדוחף את האלמנטים (למשל כמה עוד אלמנטים לשלוח).
בחצי הימני של הגרף אנחנו רואים אלמנטים שבוטלו (העיגולים הלבנים), בגלל שהאופרטור העביר את ההודעה cancle()
אלו הדיאגרמות הכי נפוצות שתראו.
דיאגרמת מארבל – דוגמאות מורכבות
אך לפעמים יש התנהגויות יותר מורכבות, כמו למשל ה-ParallelFlux
יוצר כמה קווים מתחתיו בתור הפלט שלו כמו בגרף למטה.
אופרטורים מסוג חלונות מייצרים פלט מסוג Flux<Flux<T>>
.
ה-Flux
העיקרי מודיע על כל פתיחת חלון, בזמן שה-Flux
הפנימי מייצג את התוכן של החלונות הללו, ואת הסיום שלהם.
החלונות מיוצגים על ידי הסתעפויות מתוך הפלט המקורי
במקרים אחרים אופרטורים יכולים לקבל Publisher
בתור פרמטר.
הפרמטרים הללו יעזרו לשנות את ההתנהגות של האופרטור בהתאם לפרמטר.
מקרים כאלו יראו ככה בדיאגרמה
מקרא מלא של הדיאגרמת מארבל
אז אחרי שראינו את רוב המרכיבים הנפוצים של הדיאגרמות, בוא נראה את המקרא המלא שלהן
ובנוסף יש לנו מקרא של כל ה-side effects
דוגמאות קוד
סיימנו עם התיאוריה להיום.
אבל אני לא רוצה להשאיר אתכם בלי התקדמות גם בצד הקוד, ולכן בואו נסתכל קצת על מקרים פשוטים של כתיבת קוד ריאקטיבי ב-Reactor
hello world
כמו שאתם בוודאי יודעים, כל מסע מתחיל ב-Hello World!
כמו שכבר דיברנו בעבר, Mono
מייצג תוצאה אסינכרונית של 0-1 איברים.
Mono<String> service Result = Mono.just("Hello World!");
String result = serviceResult.block();
assertEquals("Hello World!", result);
בדוגמה למעלה יש לנו בשורה הראשונה יצירה של Mono
שמכיל איברים מסוג מחרוזת.
זה מה שה-just()
מבצע בשבילנו.
בשורה השנייה, אנחנו מחזירים את האלמנט שה-Mono
הזה מחזיר על ידי קריאה ל-block()
האופרטור block
יהפוך את הקוד האסינכרוני לקוד סינכרוני (כלומר יחסום את הריצה של הקוד עד שה-publisher יסיים).
שימוש ב-timeout
כאשר אתם מנסים למשוך מידע בצורה ריאקטיבית, אתם לא רוצים שהקוד שלכם יחכה עד אינסוף לתוצאה. מה יקרה אם הסרביס שאתו אתם מנסים לדבר לא מגיב?
בשביל זה יש לנו את האפשרות לשים timeout לקריאות הריאקטיביות שלנו
Mono<String> unresponsiveService = Mono.never();
String result = unresponsiveService.block(Duration.of(1, ChronoUnit.SECONDS));
הקוד למעלה יזרוק לנו שגיאה מסוג IllegalStateException
בגלל ש-Mono.never()
לא מחזיר אף פעם תשובה.
ולכן, במקרה הזה תעבור שניה, ואז יגיע ה-timeout ועליו תיזרק השגיאה
תוצאות ריקות
סרביסים הם לא צפויים, הם יכולים להחזיר תוצאות, ולפעמים הם לא יחזירו כלום, ואף אחד לא אוהב NullPointerException
. איך מתמודדים עם מצב כזה:
Mono<String> serviceResult = Mono.empty();
Optional<String> optionalServiceResult = serviceResult.blockOptional();
assertTrue(optionalServiceResult.isEmpty());
במקרה הזה אנחנו משתמשים ב-blockOptional
שידע להחזיר לנו Optional
כדי שנשמור על עצמנו.
ואכן, למי שתהה Mono.empty
מחזיר פשוט publisher ריק שלא מדווח שום אלמנט.
רק את האלמנט הראשון
אז שיחקנו קצת עם Mono
, אבל יש מקרים בהם אנחנו מצפים לקבל כמה אלמנטים ולא רק 1 או 0.
בשביל זה יש לנו את Flux
. למי שלא זוכר Flux
מייצר לנו רצף אסינכרוני עבור 0-N אלמנטים.
אבל מה יקרה אם אנחנו רוצים רק את האלמנט הראשון שיגיע?
ובכן בשביל זה יש לנו את blockFirst
Flux<String> serviceResult = Flux.just("valid result", "oops, you collected to many, and you broke the service...");
String result = serviceResult.blockFirst();
assertEquals("valid result", result);
במקרה זה יצרנו Flux
אשר מחזיק שני אלמנטים.
והשתמשנו ב-blockFirst
על מנת לקבל רק את האיבר הראשון
לאסוף לרשימה
אוקי, נניח ועכשיו אנחנו רוצים לאסוף את כל האלמנטים ש-publisher מסויים שולח לנו לרשימה
Flux<String> serviceResult = Flux.just("Walmart", "Amazon", "Apple", "CVS Health", "UnitedHealth Group");
List<String> results = serviceResult.collectList().block();
במקרה הזה קיבלנו Flux
עם חמישה אלמנטים.
אנחנו משתמשים ב-collectList
על מנת לאסוף את כולם לרשימה
קוד אסינכרוני, אבל באמת
עד עכשיו קצת רימינו כי השתמשנו ב-block
על צורותיו השונות על מנת להפוך את הקוד האסינכרוני לקוד סינכרוני.
אבל זה קצת מאבד מהטעם של לכתוב קוד אסינכרוני.
שימוש בקוד חוסם (blocking code) משמש בעיקר לטסטים או כשאין לנו ברירה אחרת, ואנחנו חייבים לחזור לעולם הסינכרוני.
אז בואו ננסה לעשות את אותו הדבר כמו מקודם, רק בצורה אסינכרונית:
CopyOnWriteArrayList<String> companyList = new CopyOnWriteArrayList<>();
Flux<String> serviceResult = Flux.just("Walmart", "Amazon", "Apple", "CVS Health", "UnitedHealth Group");
serviceResult.doOnNext(companyList::add)
.subscribe();
למי שלא מכיר את האובייקט CopyOnWriteArrayList
זה וריאציה של thread safe ArrayList
.
בדוגמה הזאת אנחנו משתמשים באופרטור doOnNext
על מנת לקבל את האלמנט הבא.
שימו לב שאנחנו מעבירים ל-doOnNext
את הפונקציה שאנחנו רוצים שתרוץ.
מה שקורה פה בעצם הוא שכל אלמנט ב-Flux
נכנס כפרמטר עבור companyList.add
.
וכאשר אנחנו קוראים ל-subscribe
אנחנו בעצם מסמנים ל-publisher
שאנחנו מוכנים לקבל אלמנטים.
מה עושים כשהסטרים נגמר?
אוקי, אז הוספנו את כל האיברים לרשימה בצורה אסינכרונית.
אבל יש עוד סיגנל שלא טיפלנו בו עדיין והוא onComplete
.
אז בואו נוסיף את זה עכשיו
CopyOnWriteArrayList<String> companyList = new CopyOnWriteArrayList<>();
Flux<String> serviceResult = Flux.just("Walmart", "Amazon", "Apple", "CVS Health", "UnitedHealth Group");
serviceResult.doOnNext(companyList::add)
.doOnComplete(System.out.println("Complete"))
.subscribe();
כאן אנחנו משתמשים ב-doOnComplete
.
האופרטור הזה נקרא כאשר הסיגנל onComplete
מגיע.
סיכום
עברנו הרבה בפוסט הזה.
למדנו איך אנחנו קוראים את הדיאגרמות מארבל של Reactor
מה כל אחד מהסימנים בדיאגרמות האלו מסמן, ומה ה-flow הכללי שלהם.
ובחלק השני הסתכלנו קצת על דוגמאות קוד בסיסיות.