كيفية RxJava2: مقدمة السلسلة و PublishProcessor

لقد أمضيت الأيام القليلة الماضية متجمعين في كتلة متواضعة من البطانيات على كرسي الكمبيوتر أمام جهاز الكمبيوتر الخاص بي ، مرارًا وتكرارًا في طريقي عبر تطبيق RxJava2 Javadoc ، محاولًا اكتساب بعض أشكال فهمه. ربما أكون قاتماً بعض الشيء ، أو (على أمل ، وعلى الأرجح يبدو أكثر) RxJava2 هي ببساطة مكتبة مربكة. لقد ارتدت من وثائق Reactive Streams إلى إرشادات تصميم Rx.NET 2010 ، ومن خلال كل صفحة تقريبًا على reactivex.io ، ومع ذلك ، لم يكن الأمر كذلك حتى بدأت عملية تجريبية وأخطاء شاقة ، لقد بدأت بالفعل في اكتساب فهم لكيفية عمل هذه المكتبة. هذه المقالة هي الأولى من بين العديد من المستندات التي توثق حرجتي في أعماق RxJava2.

الفكرة الرئيسية في هذه السلسلة هي جعل هذه المفاهيم أكثر سهولة للوصول إلى الآخرين الذين هم جدد في RxJava 2. ربما أكون محظوظًا ، أو ربما هذا ليس "شيئًا" ، لكن لم أجد أيًا من المعلومات التي أجدها يبدو مباشرة بما فيه الكفاية. لذلك ، أعتزم تقديم (في نهاية المطاف) جميع المفاهيم في RxJava2 كما أتعلمها بأكثر الطرق المباشرة و "التعليمية" التي يمكنني القيام بها.

أساسيات الناشر

يمتد PublishProcessor فئة FlowableProcessor. بالامتداد ، هذا يعني أن PublishProcessor يعمل كناشر ومشترك في نفس الوقت. بسبب هذا الغرض المزدوج ، يتم تمكين الإرسال المتعدد. يقوم ناشر ProProcessor بإرسال عناصر إلى المشتركين فيه ، مما يعني أن جميع المشتركين في هذا الناشر سوف يتلقى جميع العناصر المرسلة ، حيث يتم إرسالها من المصدر [PublishProcessor].

يعد PublishProcessor مثاليًا عندما تحتاج إلى علاقة من مصدر إلى مستهلك حيث يوجد مصدر واحد والعديد من المستهلكين من نفس المصدر.

مثيل

PublishProcessor  processor = PublishProcessor.create ()؛

هذا ، بكل بساطته ، هو في الحقيقة كيفية إنشاء مثيل لـ PublishProcessor. في المثال أعلاه ، يقوم بناء الجملة بإنشاء PublisherProcessor يقوم بإرسال عناصر من النوع Object إلى المشتركين فيه ، على الرغم من أنه يمكنك جعل هذا ما تحتاج إليه.

من يمكنه الاشتراك في PublishProcessors؟

مشترك ملحقات الامتدادات التفاعلية ، هذا هو من! إليك تطبيق المشترك الأساسي ، وهو:

استيراد org.reactivestreams.Subscriber؛
استيراد org.reactivestreams.Subscription ؛

BaseSubscriber فئة عامة تنفذ المشترك  {

    اشتراك خاص

    @تجاوز
    باطل onSubscribe (اشتراك الاشتراك) {
        System.out.println ("مشترك جديد") ؛
        this.subscription = اشتراك ؛
        subscription.request (1)؛
    }

    @تجاوز
    الفراغ العام onNext (Object o) {
        System.out.println ("حصلت على:" + س) ؛
        subscription.request (1)؛
    }

    @تجاوز
    باطلة onError العامة (Throwable رمي)
        throwable.printStackTrace ()؛
    }

    @تجاوز
    الفراغ العام onComplete () {
        System.out.println ( "كاملة.")؛
    }
}

هناك بعض الأشياء المهمة التي يجب ملاحظتها في الكود أعلاه. أولاً ، في طريقة onSubscribe ، تحتاج إلى التأكد من تخزين مرجع لكائن الاشتراك. هذا حتى تتمكن من استخدام subscription.request (1) في أسلوب onNext. هذا مهم حقًا - تأكد من إجراء مكالمة لطلب الاشتراك على هيئة أسلوب onSubscribe - وهذا يضمن أن البيانات سوف تتدفق بشكل صحيح عند استدعاء onNext. إنه يشبه إلى حد كبير السفينة الدوارة - onSubscribe هو بمثابة الصعود إلى السفينة الدوارة وركوب الصعود الأول البطيء ، لكن التوقف قبل الهبوط الأول. في هذا القياس ، لن يحدث "الإسقاط" الأول حتى يتم إجراء المكالمة onNext الأولى. بعد ذلك ، يمكن أن تتدفق البيانات بحرية ، أو حتى يحدث onComplete / onError.

داخل جسم طريقة onNext هو المكان الذي يحدث فيه "السحر". هذا هو المكان الذي يجب أن توفر منطق العمل لأي احتياجاتك. الشرط الوحيد هو إجراء مكالمة للاشتراك. طلب ​​(طويل) بداخله - وهذا يضمن أن البيانات يمكن أن تستمر في التدفق بشكل صحيح.

يخبرك تطبيق أسلوب onError أعلاه ببساطة بالمكان الذي وضعت فيه الشفرة ، ويرسل onComplete رسالة وحدة تحكم أساسية لإعلامك بذلك (تم إكمال الاشتراك مع PublishProcessor في هذه الحالة) بنجاح.

وضعها معا

يستخدم هذا المثال فئة المثال BaseSubscriber من أعلى.

ثابت الفراغ الثابت الرئيسي (سلسلة [] الحجج) {

    PublishProcessor  processor = PublishProcessor.create ()؛
    مشترك BaseSubscriber = BaseSubscriber () جديد ؛
    processor.subscribeActual (subscriberA) ؛
    processor.onNext ("بعض سطر النص") ؛
    processor.onNext ("سطر آخر من النص") ؛
    processor.onNext ("سطر آخر من النص") ؛
    
    المشترك BaseSubscriber = جديد BaseSubscriber ()؛
    processor.subscribeActual (subscriberB) ؛
    processor.onNext ("بعض سطر النص الآخر") ؛
    processor.onNext ("السطر الأخير من النص") ؛
    processor.onComplete ()؛

}

يتم إرسال جميع المكالمات إلى onNext تقنيًا للمشتركين "all" ، ولكن بالنسبة للمكالمات الثلاثة الأولى ، يوجد مشترك واحد فقط ، لذلك يجب أن يظهر الإخراج مرة واحدة فقط. على المكالمات الرابعة والخامسة على onNext ، أضفنا subscriberB ، مما يعني أننا يجب أن نرى هاتين المكالمتين تتكرر مرتين في الإخراج. وأخيرًا ، يتم أيضًا إرسال الدعوة إلى onComplete إلى جميع المشتركين ، لذلك يجب أيضًا عرض رسالة الإكمال مرتين (مرة واحدة لكل مشترك تم استدعاء onCompleted).

انتاج |

مشترك جديد
حصلت: بعض سطر النص
حصلت: سطر آخر من النص
حصلت: بعد سطر آخر من النص
مشترك جديد
حصلت: بعض السطر الآخر من النص
حصلت: بعض السطر الآخر من النص
حصلت: السطر الأخير من النص
حصلت: السطر الأخير من النص
اكتمال.
اكتمال.

كما هو متوقع ، أسفرت المكالمات الثلاثة الأولى فقط لـ onNext عن سطر واحد من النص يتم عرضه نظرًا لوجود مشترك واحد فقط. في السطر الخامس من الإخراج ، نرى رسالة "مشترك جديد" للإشارة إلى أن هناك مشتركين الآن. وكما هو متوقع أيضًا ، يتم تكرار جميع خطوط الإنتاج النهائية بعدد المشتركين لديها حاليًا.

خاتمة

هذا هو أول مقال لي في العديد من المقالات حول RxJava2 - أعلم أنه يبدو من الغريب أن نبدأ بشيء محدد جدًا مثل PublishProcessor ، لكن لدي مشاريع خارجية أعمل عليها باحتياجات يمكن لهذه الفئة المحددة التعامل معها جيدًا. القرار كان بحتة بدافع من جهتي. المضي قدما ، أنوي التركيز على المزيد من المفاهيم الأساسية والبناء من هناك. آمل أن يكون هذا من بعض الاستخدام لشخص ما!