תכנות ריאקטיבי - Reactor - יצירת רצף מותאם אישית

·

7 דקות קריאה

שלום לכולם,
הפעם נמשיך לדבר על ספריית Reactor ועל תכנות ריאקטיבי.
בפוסט הקודם למדנו איך אנחנו יכולים ליצור Flux או Mono, ואיך אנחנו יכולים להרשם אליהם (subscribe)
עד עכשיו השתמשנו במתודות מוגדרות של ג'אווה על מנת ליצור את הרצפים שלנו, אבל לפעמים אנחנו רוצים לשלוט ברמה יותר גבוהה מה האיברים שאנחנו שולחים, ומתי אנחנו מסיימים.

יצירה בצורה סינכרונית באמצעות generate

הצורה הקלה ביותר ליצור Flux היא באמצעות המתודה generate אשר מקבל פונקציה שבאמצעותה יוצרים את ה-Flux.

הצורה הזאת היא סינכרונית לחלוטין, ויש לו יחס של אחד-לאחד על הפלט שלו. כלומר ה-sync שאנחנו יוצרים הוא SynchronousSink, ואפשר לקרוא למתודה **next()** שלה לכל היותר פעם אחת עבור כל קריאה של ה-callback. לאחר מכן אפשר לקרוא גם ל-error(Throwable) ול-complete(), אבל זה לא חובה.

ככל הנראה הדרך הכי שימושית תהיה להעביר אובייקט המציין מהו ה-state שלנו, כדי להחליט מה האיבר הבא שאנחנו פולטים ברצף. הפונקציה המייצרת הופכת להיות BiFunction<S, SynchronousSink<T>, S> כאשר <S> הוא הסוג של האובייקט state וה-<T> הוא סוג האיברים שהרצף שלנו מחזיר. אנחנו צריכים לספק Supplier<S> על מנת לקבוע את הערך ההתחלתי של ה-state, והפונקציה שלנו יכולה להחזיר את הערך החדש של state.

לדוגמה, אנחנו יכולים להשמתמש ב-int בתור ה-state שלנו:

אז בואו נפרק את זה, אנחנו מתחילים בקריאה לפונקציה generate ומעבירים את הערך ההתחלתי של ה-state להיות 0. לאחר מכן, אנחנו משתמשים בערך של ה-state על מנת לחשב את הערך הבא שהרצף שלנו יחזיר החוצה. במקרה שלנו מדובר על מחרוזת המראה את החישוב של כפל בין המספר 3 לבין ה-state הנוכחי בכל שלב
בשורה מספר 3, אנחנו מחליטים האם אנחנו צריכים לעצור, או לא. אם לא עצרנו אז אנחנו נעלה את הערך של state לגודל ב-1 עבור כל איבר.
שימו לב לכמה דברים. קודם כל אנחנו צריכים להחזיר את הערך החדש של ה-state בסוף כל איטרציה של הרצף.
בשורה 8, אני משתמש ב-flux.doOnNext, על מנת לקבל את האיברים שיצרתי, ואז אני משתמש ב-blockLast(). זה אופרטור חדש שעוד לא ראינו, מה שהוא מבצע הרשמה לרצף, ודואג שהקוד יחכה עד שנקבל הודעת complete או הודעת error.
שנית, אנחנו משתמשים ב-sink על מנת להודיע לכל מי שנרשם לרצף שאנחנו כותבים, מה האלמנטים שהם מקבלים, על ידי קריאה ל sink.next(), ומתי הם מקבלים הודעה על סיום תקין על ידי קריאה ל sink.complete().

הפלט של הריצה הזאת הוא:

3 X 0 = 0 3 X 1 = 3 3 X 2 = 6 3 X 3 = 9 3 X 4 = 12 3 X 5 = 15 3 X 6 = 18 3 X 7 = 21 3 X 8 = 24 3 X 9 = 27 3 X 10 = 30

אפשרות נוספת היא להעביר אובייקט שהוא mutable בתור ה-state שלנו.
למשל את הדוגמה הקודמת אנחנו יכולים לממש על ידי העברת AtomicLong בתור ה-state שלנו, ולשנות את הערך שלו תוך כדי הריצה.
זה יראה ככה:

יש פה שני שינויים, הראשון הוא שאנחנו מעבירים את AtomicLong בתור ה-state שלנו.
שנית, הפעם אנחנו לא מייצרים אובייקט Flux ועליו מבצעים פעולות, אלא אנחנו משרשרים את את הפעולות על אותו אובייקט.

יש עוד גרסה של הפונקציה generate, והיא נראית ככה: `generate(Supplier, BiFunction, Consumer)` שהיא מאפשר לנו להעביר פונקציה **Consumer<S>** אשר תקבל את ה-state האחרון, והיא יכולה לבצע פעולות עליו
זה שימושי במקרה שאנחנו רוצים לנקות משאבים של ה-state שלנו.
הנה דוגמה לאיך קוראים לגרסה הזאת

במקרה הזה, אנחנו מדפיסים את הערך האחרון של ה-state שיש לנו.
הפלט כאן יהיה state: 11 כיוון שבשורה 2 אנחנו מעלים את הערך של ה-state שלנו אחרי שאנחנו מקבלים את הערך 10 לתוך i, ולכן ה-state הוא 11.

יצירה בצורה אסינכרונית באמצעות create

המתודה create היא צורה יותר מתקדמת של יצירת Flux בצורה דינאמית אשר מותאמת לפרסום של אלמנטים מרובים, אפילו כאלה שמגיעים מכמה threads שונים.
המתודה חושפת **FluxSink** יחד עם המתודות next, error, complete שלו, ובניגוד ל-generate אין לו state.
מצד שני, היא יכולה להטריג כמה threads שונים מתוך הקולבק שלה.
שימוש די נפוץ למתודה create היא לחבר בין API אסינכרוני שמשתמש ב-listeners לתכנות הריאקטיבי.

⚠️ המתודה create לא הופכת את הקוד שלנו לאסינכרוני, ולא מבצעת שום מקביליות, למרות שאפשר להשתמש בה יחד עם קוד אסינכרוני אחר. אם ננסה לבצע block על הקוד של ה-create אנחנו חושפים את הקוד שלנו ל-deadlock ובעיות אחרות. אפילו השימוש ב-subscribeOn, קיימת האפשרות שמישהו ינצל את זה לרעה על מנת ליצור קוד אינסופי שיחסום את המקביליות שלנו, כמו למשל לולאה אינסופית אשר קוראת ל- sink.next(t). במקרה כזה הקוד יחסום את הפייפליין שיצרנו.
ולכן הכלל אצבע שלנו הוא להשתמש ב- subscirbeOn(Scheduler, false)

בואו נדמיין שאנחנו משתמשים ב-API שמבוסס על listener. הוא מעבד מידע ב-chunks ויש לו שני איווניטים:

  • chunk של מידע מוכן

  • העיבוד של המידע הושלם (terminal event)

הקוד נראה ככה:

עכשיו אנחנו יכולים להשתמש במתודה create על מנת להמיר את הממשק הזה ל-Flux<T>

אנחנו משתמשים ב-EventListener<String> על מנת להמיר את ה-API ל-Flux<T>
כאשר אנחנו מקבלים chunk כל אלמנט בו הופך להיות אלמנט ב-Flux על ידי קריאה למתודה sink::next. ואנחנו ממירים את processComplete() ל-sink.complete()
כל הקוד הזה מתבצע בצורה אסינכרונית בכל פעם שה-myEventProcessor רץ.

בנוסף, כיוון ש-**create** יכול לגשר על API אסינכרונים וגם לנהל backpressure, אנחנו יכולים מהי האסטרטגיה שאנחנו רוצים לנהל בה את ה-backpressure על ידי ציון ה-overflowStrategy:

  • IGNORE - להתעלם לחלוטין מכל הבקשות הבאות

  • ERROR - לזורק שגיאה כאשר ה-downstream לא יכול לעמוד בקצב

  • DROP - להפיל את הסיגנל הבא אם ה-downstream לא מוכן לקבל אותו

  • LATEST - לשלוח ל-downstream רק את הסיגנל האחרון

  • BUFFER - הגישה הדיפולטיבית, שומר בבאפר את כל הסיגנלים אם ה-downstream לא מוכן לקבל אותם.

למי שמתעניין עוד בהבדלים הקטנים בין הגרסאות הללו אני ממליץ בחום על הפוסט הזה מהבלוג its all binary

יצירה בצורה אסינכרונית ב-thread בודד

יש לנו דרך ביניים בין יצירה סינכרונית באמצעות generate() לבין יצירה אסינכרונית באמצעות create() יש לנו דרך לבצע משהו ביניים, יצירה של רצף אסינכרוני אבל שירוץ על thread בודד באמצעות push().
הוא דומה ל-create בכך שהוא מסוגל להיות אסינכרוני ולנהל backpressure, אבל ההבדל הוא שב-push אפשר שיהיה רק thread שמפרסם אלמנטים יחיד. ורק ב-thread הזה אנחנו יכולים לקרוא ל-next, error, complete

הנה דוגמה:

כאן אנחנו יוצרים באופן די דומה לדוגמה הקודמת Flux אבל הפעם עם push

מודל היברידי של push/pull

רוב האופרטורים של ספריית reactor, כמו **create**, עובדים במודל של push/pull. הכוונה שלי היא שלמרות שרוב העבודה מתבצעת בצורה אסינכרונית (מה שמרמז על מודל של push), יש עדיין חלק של pull, החלק של הבקשה.
ה-consumer לא ימשוך מידע עד שהוא יתבקש לכך, אבל ה-producer ישלח את המידע ל-consumer ברגע שהמידע מוכן, אבל במסגרת הגבולות של הבקשה שהוא קיבל.
כדי לנהל את עניין הבקשות, עבור שני האופרטורים push(), create() יש להם מתודה בשם onRequest על מנת להגדיר consumer שינהל את גודל הבקשות, ולוודא שהמידע נדחף דרך ה-sink רק כאשר יש לנו בקשה

בדוגמה הזאת אנחנו משתמשים ב-onRequest למשוך מספר הודעות כאשר בקשה מגיעה, אם הן מוכנות אנחנו שולחים אותן, שאר ההודעות יגיעו בצורה אסינכרונית מאוחר יותר כאשר הן יהיו מוכנות.

ניקוי אחרי create, push

עכשיו אנחנו נכיר שני אופרטורים חדשים שישמשו אותנו על מנת לנקות אחרי publisher שהגדרנו onDispose, onCancel
האופרטור onDispose שימושי לביצוע ניקוי כאשר Flux מסיים בצורה תקינה, או בשגיאה, או כאשר מבטלים אותו.
האופרטור onCancel יכול לבצע כל פעולה שמעבירים לו לבצע כאשר מבטלים את ה-Flux לפי שהאופרטור onDispose נקרא

האופרטור handle

האופרטור handle הוא טיפה שונה. הוא מוגדר כ-instance method. הכוונה שלי היא שהוא מוצמד למקור קיים כמו אופרטורים אחרים (כמו map, filter, doOnNext וכו'). הוא קיים גם עבור Flux וגם עבור Mono.

הוא דומה יותר לאופרטור generate במובן שהוא מייצר אלמנטים בצורה סינכרונית ומאפשר פרסום של אבירים בודדים בכל פעם. ההבדל הוא שהוא מסוגל ערך שרירותי עבור כל אלמנט מקורי שהוא מקבל, עם אופציה לדלג על אלמנטים.
במובן הזה הוא סוג של שילוב בין map לבין filter (מי שלא מכיר אני ממליץ ללכת לקרוא את הפוסט שלי בנושא).
החתימה של המתודה היא Flux<R> handle(BiConsumer<T, SynchronusSink<R>>);

בואו נסתכל על דוגמה. ה-reactive streamse specification אוסר על ערכי null ברצף. מה יקרה אם אנחנו רוצים להשתמש ב-map אבל עם פונקציה שמוגדרת מראש, והפונקציה הזאת יכולה להחזיר null?
למשל יש את הפונקציה הבאה:

ואנחנו נשתמש ב-handle על מנת לסנן את ערכי ה-null

אנחנו משתמשים בפונקציה alpahbet על מנת למפות בין המספר לאות, ואז אנחנו מסננים את הערכים שהם null.
הפללט שלנו הוא

M I T

סיכום

בפוסט הזה לקחנו עוד צעד בהבנה שלנו איך אנחנו יכולים לשלוט בצורה בה אנחנו מעבירים מידע ברצפים שאנחנו יוצרים
למדנו על איך לפרסם דברים בצורה סינכרונית, בצורה אסינכרונית עם כמה threads או על thread בודד.
וגם למדנו איך אנחנו יכולים לסנן אלמנטים שאנחנו לא מעוניינים בהם.

את הקוד המלא אתם יכולים למצוא כאן

וכמו תמיד, אני אשמח לקבל כל הערה, הארה או שאלה שיש לכם