תכנות ריאקטיבי איך ליצור Flux או Mono ולבצע subscribe
7 דקות קריאה
שלום לכולם,
היום אנחנו ממשיכים בסדרה שלנו על reactive programming.
בפעם הקודמת, למדנו על מהי ספריית Reactor, ומהם אבני הבניין שלה: Flux, Mono
ראינו גם איך אנחנו מסוגלים ליצור את ה-publishers האלו
הפעם אנחנו נמשיך ללמוד איך אנחנו עובדים עם האובייקטים הללו
איך אנחנו נרשמים אליהם (subscribe()) ואיך אנחנו מקבלים מהם מידע.
בואו נתחיל
הרשמה למפרסם - Subscribe
כאשר זה מגיע להרשמה ל-publisher ב-Reactor
האובייקטים Flux ו-Mono משתמשים ב Java 8 lambdas.
יש לנו מגוון רחב מאוד של אפשרויות להרשמה.
למשל יש לנו subscribe() שמקבל למבדה לשילובים שונים של קולבקים כמו בדוגמה הבאה:
subscribe(); subscribe(Consumer<? super T> consumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,Runnable completeConsumer,Consumer<? super Subscription> subscriptionConsumer);
האופציה הראשונה היא הרשמה אשר תתחיל את הזרמת המידע
השניה מבצעת פעולה כלשהי על כל ערך שעובר דרכה
האופציה השלישית נותנת לנו טיפול בשגיאות מעבר לטיפול בנתונים תקינים
ברביעית אנחנו נשתמש אם יש פעולה שאנחנו רוצים לבצע ברגע שהרצף מסתיים בצורה תקינה
ובסוף, יש לנו את האופציה אשר נותנת לנו לטפל ב-subscription אשר חוזר מהמתודה subscribe()
כל האופציות הנ"ל מחזירות רפרנס ל-subscription שאנחנו יכולים להשתמש בהם כדי לבטל את ההרשמה כשאין יותר צורך במידע נוסף.
כאשר אנחנו נבטל את ה-subscription המקור צריך להפסיק לספק ערכים חדשים, ולנקות את כל המידע שהוא יצר.
באופן כללי Reactor מנהלת את ההתנהגות של ביטול ונקה על ידי ה-Disposable interface.
עוד נדבר עליו בהמשך בהרחבה.
בואו נסתכל על דוגמאות שלמות של יצירת Flux/Mono והרשמה אליהם
בדוגמה הזאת אנחנו יוצרים Flux אשר מביא לנו שלושה איברים - 1,2,3.
ובשורה השניה אנחנו נרשמים ל-Flux בצורה הפשוטה ביותר.
בדוגמה הזאת אין לנו שום תוצאה שנראית לעין, אבל זה עובד.
כלומר הקוד ייצר ויעביר לנו שלושה איברים בפייפליין הזה.
אם נספק למבדה ל-subscribe אנחנו יכולים לגרום לתהליך להיות ויזבילי.
בדוגמה הבאה אנחנו נראה את אחת האופציות לכך
הפלט של הריצה הזאת הוא:
1 2 3
כדי להדגים את החתימה השלישית, אנחנו בכוונה נזרוק שגיאה :
אנחנו רואים פה כמה דברים חדשים, אז בואו נפרק את זה:
קודם כל בשורה 2 אנחנו יוצרים Flux חדש, הפעם עם 4 איברים - 1,2,3,4
לאחר מכן אנחנו משתמשים באופרטור map על מנת להמיר את האיבר שנכנס לערך חדש.
עבור רוב הערכים אנחנו מחזירים את הערך המקורי
אבל עבור ערך אחד אנחנו נחזיר שגיאה בכוונה.
בשורה 7 אנחנו נרשמים ל-Flux ומעבירים שתי למבדות:
הראשונה היא לטפל בערכים תקינים
והשניה היא לטפל בשגיאות
הפלט של הדוגמה השלישית הוא
**1 2 3 Error: java.lang.RuntimeException: Got to 4**
הדוגמה הבאה תציג את החתימה עם טיפול בערכים תקינים, טיפול בשגיאות וטיפול במפרסם שסיים בצורה תקינה
כאן אנחנו נרשמים ל-Flux ובנוסף לשתי הלמבדות שהעברנו בדוגמה הקודמת
אנחנו מעבירים למבדה שלישית, אשר תקרא כאשר ה-Flux ישלח את הסיגנל שהוא סיים לפרסם בהצלחה.
הפלט יראה ככה
**1 2 3 4 Done**
1
2
3
4
Done
סיגנלים של שגיאה וסיום מוצלח הם שניהם סיגנלים סופיים (terminals) והם לא יכולים לבוא ביחד.
כלומר, אין אפשרות שאותו publisher ישלח גם error signal וגם completion signal.
כדי שהלמבדה שנקראת ב-completion signal אנחנו חייבים לוודא שאין שגיאה שנזרקת.
ביטול subscribe עם Disposable
כמו שכבר כתבתי למעלה, כל החתימות של ה-subscribe מחזירות אובייקט מסוג Disposable.
במקרה שלנו, ה-Disposable מייצג את העובדה שאנחנו יכולים לבטל את ה-subscription על ידי קריאה למתודה dispose().
עבור Flux ו-Mono שליחת cancellation signal היא סימן שהם צריכים להפסיק לפרסם אלמנטים חדשים.
אבל אין לנו הבטחה שזה יקרה מיד, יש מקורות אשר מפרסמים אלמנטים כל כך מהר, אשר יש סיכוי מסוים שהם יסיימו לפרסם לפני שהם קיבלו את הודעת הביטול.
חלק מהיכולות של Disposable קיימות תחת המחלקה Disposables. ביניהן, Disposables.swap() יוצר Disposable לבטל ולהחליף בצורה אוטומטית Disposable קיים.
זה יכול להיות שימושי, למשל, אנחנו נרצה ב-UI לבטל בקשה קיימת והחליף אותה עם בקשה חדשה בכל פעם שהיוזר שלנו לוחץ על כפתור.
כאשר אנחנו קוראים ל **dispose()** אנחנו בעצם סוגים את הצ'אנל, ובכך אנחנו זורקים לפח את הערך הנוכחי שאמור להגיע וכל ערך עתידי.
עוד יכולת מעניינת היא ה Disposables.composite(). ההרכבה נותנת לנו את האפשרות לאסוף כמה Disposable ולבטל את כולם בבת אחת.
למשל, אם יש לנו כמה בקשות באוויר, ואנחנו רוצים לבטל את כולם. ברגע שנקרא למתודה dispose() על ההרכבה, כל ניסיון להוסיף Disposable להרכבה יבטל אותו מיידית.
אלטרנטיבה ל Subscribe עם למבדות
יש דרך אחרת לבצע subscribe, שהיא יותר גנרית מאשר שימוש בלמבדות.
אנחנו יכולים להעביר Subscriber מאשר להרכיב אחד.
ספריית Reactor מספקת לנו מחלקה בשם BaseSubscriber אשר עוזרת לנו לכתוב Subscriber משלנו.
שימו לב!!
שימו ב-BaseSubscriber או בכל אחת מהמחלקות היורשות שלו הוא חד-פעמי, כלומר אם ה-BaseSubscriber מבטל את ההרשמה שלו ל-publisher בצורה אוטומטית אם הוא נרשם ל-publisher אחר.
עכשיו אנחנו יכולים לממש אחד כזה. אנחנו נקרא לו SampleSubscriber:
המחקלה SampleSubscriber מרחיבה את BaseSubscriber, שכמו שכבר אמרנו היא מחלקה אבסטרקטית אשר נותנת את האפשרות להרחיב אותה על מנת ליצור Subscribers מותאמים לרצונות שלנו.
המחלקה חושפת hooks אשר אפשר לדרוס אותם על מנת לשנות את ההתנהגות של ה-Subscriber.
באופן דיפולטיבי, ה-BaseSubscriber מתנהגת כמו subscribe רגיל, עם בקשה לקבל אלמנטים בכמות לא מוגבלת.
אבל, דריסה של ההתנהגות הדיפולטיבית היא מאוד שימושית אם אנחנו רוצים להגביל את האלמנטים שיגיעו אלינו בבקשה הבאה.
למי שאינו מכיר, חלק מהשיח בין ה subscriber לבין ה-publisher, ה-subscriber אחרי שהוא מטפל באלמנטים שהוא קיבל, הוא מודיע ל-publisher כמה אלמנטים הוא מוכן לקבל בפעם הבאה.
על מנת להגדיר ב-Reactor את כמות האלמטים שאנחנו מוכנים לטפל בהם בכל פעם, אנחנו צריכים לממש את hookOnSubscribe(**Subscription subscription)** ואת hookOnNext(T value) כמו שיש לנו בדוגמה למעלה. בדוגמה כאשר המתודה hookOnSubscribe היא מדפיסה את המחרוזה "Subscribed" למסך ומבקשת את האלמנט הראשון. כאשר אנחנו מקבלים אותו, המדותה hookOnNext נקראת, והיא מדפיסה למסך את הערך שהיא קבילה, ומבקשת את הערך הבא.
ככה אנחנו יכולים להשתמש ב-SampleSubscriber עם **Flux**:
הפלט של הקוד הזה
Subscribed 1 2 3 4
ה-BaseSubscriber חושף לנו גם את המתודה requestUnbounded() אשר מאפשר לנו לעבור למצב לא מוגבל, בדיוק כמו ב-subscribe() (זהה לחלוטין ל-request(Long.MAX_VALUE)), וגם את המתודה **cancel()**.
הוא גם חושף לנו את המתודות hookOnComplete, hookOnError, hookOnCancel ואת hookFinally (שתמיד נקרא כאשר סוגרים רצף, עם הסיגנל שמועבר כפרמטר מסוג SignalType).
אנחנו כמעט תמיד נרצה לממש את hookOnError, hookOnCancel ואת hookOnComplete.
ה-**SampleSubscriber** הוא המימוש המינמאלי ביותר לטיפול בבקשות מרובות
Backpressure ודרכים לשינוי בקשות
קודם כל בואו ננסה להבין בקצרה מה זה בכלל backpressure?
Back pressure (or backpressure) is a resistance or force opposing the desired flow of fluid through pipes, leading to friction loss and pressure drop.
The term back pressure is a misnomer, as pressure is a scalar quantity, so it has a magnitude but no direction.
בעולם התוכנה, אנחנו יכולים לחשוב על publisher ששולח מידע מאוד מהר, ועל subscriber שצורך את המידע מאוד לאט.
המנגנון של backpressure מונע מה-publisher להציף את ה-subscriber במידע.
כאשר אנחנו מממשים backpressure ב-Reactor, ה-subscriber שולח request אל ה-publisher (אני עושה פה הפשטה, זה הרבה יותר מסובך מזה).
סה"כ בקשות לפעמים נקרא ה"דרישה" או "בקשות ממתינות". המקסימום בקשות אשר אפשר לבקש הוא **Long.MAX_VALUE**, אשר מייצג שאין לנו הגבלה. כלומר אנחנו מעבירים מסר ל-publisher שלנו של "שלח הכי מהר שאתה מסוגל", ובעצם מבטל את מנגנון ה-backpressure.
ה-request הראשונה מגיעה מה-subscriber האחרון בזמן ההרשמה שלו, אבל חשוב לשים לב, שרוב הדרכים הדיפולטיבות לבצע subscription יבטלו באופן מיידי את ה-backpressure.
למשל:
subscribe()ורוב הגרסאות שלו אשר מקבלות למבדות.שימוש באופרטורים
block(), blockFirst(), blockLast()(נדבר עליהם בהרחבה בהמשך)toIterable(), toStream()
הדרך הפשוטה ביותר לדרוס את ההתנהגות הזאת, היא על ידי דריסה של המתודה hookOnSubscribe של BaseSubscriber כמו בדוגמה הבאה
כאן אנחנו יכולים לראות שימוש באופטרטורים חדשים.
האופרטור doOnRequest מגדיר התנהגות מסוימת שה-Flux שלנו מבצע כאשר הוא מקבל בקשה לאלמנטים נוספים.
ואנחנו דורסים את ה-BaseSubscriber עם התנהגות חדשה.
כאשר מבצעים subscription הוא יבקש אלמנט בודד מה-publisher, וכאשר מגיע האלמנט הוא ידפיס את ההודעה ואז יבטל את עצמו.
הפלט נראה ככה:
**request of 1 Cancelling after having received 1**
כאשר אנחנו משנים את ההתנהגות של subscriber אנחנו צריכים להיות מאוד זהירים, ולדאוג שתהיה מספיק דרישה לאלמנטים נוספים כדי שה-publisher יוכל להתקדם, אחרת ה-Flux יכול ל"התקע".
זה למה ההתנהגות הדיפולטיבית של BaseSubscriber היא לא להגביל.
סיכום
בפוסט הזה התקדמנו עם הידע שיש לנו על Reactor
ראינו איך אנחנו יכולים ליצור Flux או Mono ומה הדרכים שאנחנו יכולים לבצע subscription אליהם.
בנוסף, למדנו איך אנחנו יכולים לבטל subscription בצורה נוחה ויעילה.
אחרי זה למדנו איך ליצור subscriber משלנו, ואיך אנחנו יכולים באמצעותו לדרוס את ההתנהגות הדיפולטיבית על מנת לטפל ב-backpressure.
מקווה שנהניתם, ואשמח שתעזרו לבלוג לגדול על ידי שיתוף של הפוסט הזה או פוסט אחר שאהבתם.
וכמו תמיד, אשמח לקבל כל הערה, הארה או שאלה שיש לכם.
