كيفية كتابة إطار عمل MapReduce في بيثون

وسيلة سهلة لمتابعة الدليل

يقول بوبيت: "حاولت أن أشرح قدر استطاعتي". "أنا أعتقد أنني صنعت تشابهًا حول الكعكة". "من لا يشبه تشبيه الكيك الجيد؟"

لا أعرف لماذا قررت تسمية هذا المخطط الإطاري ، لكني أحب هذا الاسم والجميع يحب الكعكة .. على أي حال ...

MapReduce هو نموذج أنيق يبسط معالجة مجموعات البيانات مع الكثير من الأشياء (مثل مجموعات البيانات الكبيرة). كنتيجة لمشروع عطلة نهاية الأسبوع ، هناك تنفيذ إطار عمل Python MapReduce شديد التبسيط. في هذا المنشور ، سيتم إرشادك من خلال الخطوات التي اتبعتها وتطبيق مثال لحساب الكلمات المطبقة على "الكشف عن مواز: قصة حب" مأخوذة من مشروع غوتنبرغ. النسخة النهائية من الكود موجودة كخريطة على جيثب. فيما يلي بعض الخيارات التي تم تنفيذها للتنفيذ:

  • نحن نستخدم CPython الإصدار 2.7.6.
  • يتم استخدام وحدة المعالجة المتعددة لتفرخ العمليات ، عن طريق استدعاء الأسلوب start () على كائن Process تم إنشاؤه.
  • هناك ملف الإخراج المقابلة لكل الخيط الحد.
  • يمكن دمج المخرجات في ملف واحد في النهاية.
  • يتم تخزين نتائج خطوة الخريطة (وكذلك ملفات الإخراج لكل مؤشر ترابط مختزل) في الذاكرة باستخدام JavaScript Object Notation (JSON).
  • يمكن للمرء اختيار حذف أو ترك هذه الملفات في النهاية.

إذا لم تكن معتادًا على إطار عمل mapreduce ، ستجد مقدمة لطيفة عنه على هذا الرابط في Quora. استمتع :)

تطبيق فئة MapReduce

أولاً ، ما سنفعله هو كتابة فصل MapReduce سيلعب دور الواجهة التي سيتم تنفيذها من قبل المستخدم. سيكون لهذه الفئة طريقتان: مخطط ومخفض يجب تنفيذهما لاحقًا (يتم عرض مثال تنفيذ لعدد الكلمات باستخدام MapReduce أدناه في المقطع مثال على عدد الكلمات). ومن هنا نبدأ بكتابة الفصل التالي:

اعدادات مهمه
فئة MapReduce (كائن):
    فئة "MapReduce" التي تمثل نموذج mapreduce
    ملاحظة: يجب أن تكون طرق "معين" و "المخفض"
    نفذت لاستخدام نموذج mapreduce.
    "" "
    def __init __ (self، input_dir = settings.default_input_dir، output_dir = settings.default_output_dir،
                 n_mappers = settings.default_n_mappers، n_reducers = settings.default_n_reducers،
                 نظيفة = صحيح):
        "" "
        : param input_dir: دليل لملفات الإدخال ،
        مأخوذة من الإعدادات الافتراضية إذا لم تقدم
        : param output_dir: دليل ملفات الإخراج ،
        مأخوذة من الإعدادات الافتراضية إذا لم تقدم
        : param n_mappers: عدد مؤشرات ترابط معين ،
        مأخوذة من الإعدادات الافتراضية إذا لم تقدم
        : param n_reducers: عدد مؤشرات ترابط المخفض الواجب استخدامها ،
        مأخوذة من الإعدادات الافتراضية إذا لم تقدم
        : param clean: اختياري ، إذا كانت الملفات المؤقتة الحقيقية هي
        حذف ، صحيح افتراضيا.
        "" "
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.n_mappers = n_mappers
        self.n_reducers = n_reducers
        التنظيف الذاتي = نظيف
    def mapper (self، key، value):
        "" "إخراج قائمة أزواج قيمة المفتاح ، حيث يكون المفتاح
        يحتمل أن تكون جديدة والقيم من نوع مختلف محتمل.
        ملاحظة: هذه الوظيفة يجب تنفيذها.
        : مفتاح بارام:
        : القيمة الأساسية:
        "" "
        البشري
    def مخفض (الذاتي ، مفتاح ، value_list):
        "" "إخراج قيمة واحدة مع المفتاح المقدم.
        ملاحظة: هذه الوظيفة يجب تنفيذها.
        : مفتاح بارام:
        : param value_list:
        "" "
        البشري

لشرح الإعدادات المختلفة ، راجع قسم وحدة الإعدادات أدناه. بعد ذلك ، سنحتاج إلى إضافة طريقة تشغيل () ، لـ MapReduceclass ، ستنفذ الخريطة وتقلل من العمليات. لهذا نحتاج إلى تحديد طريقة (فهرس) run_mapper (حيث يشير الفهرس إلى الخيط الحالي) ، والتي ستستخدم المخطط وتخزين النتائج على القرص ، و run_reducer (الفهرس) الذي سيطبق المخفض على نتائج خريطة وتخزين النتائج على القرص. ستعمل طريقة التشغيل () على إنتاج العدد المرغوب فيه من المخططين ثم عدد المخفضات المرغوب فيه. يتم استخدام كائن عملية من وحدة المعالجة المتعددة كما يلي:

def run_mapper (ذاتي ، فهرس):
    "" "يقوم بتشغيل المخطط المطبق
    : فهرس المعلمة: مؤشر الخيط لتشغيل
    "" "
    # قراءة مفتاح
    # قراءة القيمة
    # الحصول على نتيجة معين
    # تخزين النتيجة لاستخدامها من قبل المخفض
    البشري
def run_reducer (self، index):
    "" "يشغل المخفض المطبق
    : فهرس المعلمة: مؤشر الخيط لتشغيل
    "" "
    # تحميل نتائج الخريطة
    # لكل مفتاح يقلل من القيم
    # تخزين النتائج لهذا المخفض
    البشري
المدى def (النفس):
    "" "ينفذ الخريطة ويقلل العمليات
    "" "
    # تهيئة قائمة المصممين
    map_workers = []
    # تهيئة قائمة المخفضات
    rdc_workers = []
    # قم بتشغيل خطوة الخريطة
    لـ thread_id في النطاق (self.n_mappers):
        p = العملية (الهدف = self.run_mapper ، args = (thread_id ،))
        p.start ()
        map_workers.append (ع)
    [t.join () لـ t في map_workers]
    # تشغيل الخطوة الحد
    لـ thread_id في النطاق (self.n_reducers):
        p = العملية (الهدف = self.run_reducer ، args = (thread_id ،))
        p.start ()
        map_workers.append (ع)
    [t.join () لـ t في rdc_workers]

الآن ، يجب علينا إكمال أساليب run_mapper و run_reducer. ولكن نظرًا لأن هذه الأساليب تتطلب قراءة وتخزين البيانات من ملف إدخال واحد ، سنقوم أولاً بإنشاء فئة FileHandler. ستقوم هذه الفئة بتقسيم ملف الإدخال باستخدام طريقة (number_of_splits) split_file (حيث يكون عدد الانقسامات هو إجمالي عدد القطع التي نريدها نتيجة الانقسام). ستنضم فئة FileHandler أيضًا إلى المخرجات باستخدام طريقة ajoin_files (number_of_files ، نظيفة ، مرتبة ، متناقصة) (حيث يمثل number_of_files العدد الإجمالي للملفات للانضمام ، والتنظيف ، والفرز ، والزيادة ، كل الوسائط المنطقية الاختيارية مضبوطة على True افتراضياً في حالتنا. يشير clean إلى ما إذا كنا نريد حذف الملفات المؤقتة بعد الانضمام ، يشير الفرز إلى ما إذا كنت تريد فرز النتائج أم لا ، بينما يشير الانخفاض إلى ما إذا كنا نريد الفرز بترتيب عكسي). نحن نضع هذا في الاعتبار ، نبدأ بكتابة كائن FileHandler كما يلي:

فئة FileHandler (كائن):
    فئة "" FileHandler
    يدير تقسيم ملفات الإدخال وربط المخرجات معًا.
    "" "
    def __init __ (self، input_file_path، output_dir):
        "" "
        ملاحظة: يجب إعطاء مسار ملف الإدخال للتقسيم.
        مطلوب دليل الإخراج للانضمام إلى المخرجات.
        : param input_file_path: مسار ملف الإدخال
        : param output_dir: مسار دليل الإخراج
        "" "
        self.input_file_path = input_file_path
        self.output_dir = output_dir
    def split_file (self ، number_of_splits):
        "" "قسم الملف إلى ملفات متعددة.
        : param number_of_splits: عدد الانقسامات.
        "" "
        البشري
    def join_files (ذاتي ، number_of_files ، نظيف = بلا ، فرز = صحيح ، تناقص = صحيح):
        "" "ربط جميع الملفات في دليل الإخراج إلى
        ملف الإخراج واحد.
        : param number_of_files: إجمالي عدد الملفات.
        : param clean: إذا كان سيتم حذف النواتج الحقيقية ،
        افتراضيا يأخذ قيمة self.clean.
        : نوع بارام: فرز المخرجات.
        : تناقص param: فرز حسب تناقص القيمة العالية
        لقيمة منخفضة.
        : return output_join_list: قائمة المخرجات
        "" "
        البشري

بعد ذلك ، نكمل طرق التقسيم والانضمام:

استيراد نظام التشغيل
استيراد جسون
فئة FileHandler (كائن):
    فئة "" FileHandler
    يدير تقسيم ملفات الإدخال وربط المخرجات معًا.
    "" "
    def __init __ (self، input_file_path، output_dir):
        "" "
        ملاحظة: يجب إعطاء مسار ملف الإدخال للتقسيم.
        مطلوب دليل الإخراج للانضمام إلى المخرجات.
        : param input_file_path: مسار ملف الإدخال
        : param output_dir: مسار دليل الإخراج
        "" "
        self.input_file_path = input_file_path
        self.output_dir = output_dir
    def start_file_split (self ، split_index ، index):
        تهيئة "" "ملف تقسيم عن طريق فتح فهرس وإضافة.
        : param split_index: فهرس الانقسام الذي نستخدمه حاليًا ، لاستخدامه لتسمية الملف.
        : فهرس المعلمة: الفهرس المعطى للملف.
        "" "
        file_split = open (settings.get_input_split_file (split_index-1) ، "w +")
        file_split.write (str (index) + "\ n")
        إرجاع file_split
    def is_on_split_position (الذات ، الشخصية ، الفهرس ، split_size ، current_split):
        "" "تحقق مما إذا كان هذا هو الوقت المناسب للانقسام.
        أي: الحرف هو مسافة وتم الوصول إلى الحد الأقصى.
        : الشخصية البارزة: الشخصية التي نحن عليها الآن.
        : مؤشر بارام: المؤشر الذي نحن عليه حاليا.
        : param split_size: حجم كل انقسام واحد.
        : param current_split: الانقسام الذي نحن عليه الآن.
        "" "
        مؤشر الإرجاع> split_size * current_split + 1 و character.isspace ()
    def split_file (self ، number_of_splits):
        "" "قسم الملف إلى ملفات متعددة.
        ملاحظة: لم يتم تحسين هذا لتجنب النفقات العامة.
        : param number_of_splits: عدد القطع إلى
        تقسيم الملف إلى.
        "" "
        file_size = os.path.getsize (self.input_file_path)
        unit_size = file_size / number_of_splits + 1
        original_file = open (self.input_file_path، "r")
        file_content = original_file.read ()
        original_file.close ()
        (الفهرس ، current_split_index) = (1 ، 1)
        current_split_unit = self.begin_file_split (current_split_index ، فهرس)
        للحرف في file_content:
            current_split_unit.write (حرف)
            إذا self.is_on_split_position (الحرف ، الفهرس ، unit_size ، current_split_index):
                current_split_unit.close ()
                current_split_index + = 1
                current_split_unit = self.begin_file_split (current_split_index ، فهرس)
            مؤشر + 1
        current_split_unit.close ()

الآن ، يمكننا إكمال طرقنا run_mapper و run_reducer مثل:

def run_mapper (ذاتي ، فهرس):
    "" "يقوم بتشغيل المخطط المطبق
    : فهرس المعلمة: مؤشر الخيط لتشغيل
    "" "
    input_split_file = open (settings.get_input_split_file (الفهرس) ، "r")
    مفتاح = input_split_file.readline ()
    القيمة = input_split_file.read ()
    input_split_file.close ()
    إذا (self.clean):
        os.unlink (settings.get_input_split_file (فهرس))
    mapper_result = self.mapper (المفتاح ، القيمة)
    بالنسبة لـ reduer_index في النطاق (self.n_reducers):
        temp_map_file = open (settings.get_temp_map_file (الفهرس ، reduer_index) ، "w +")
        json.dump ([(مفتاح ، قيمة) لـ (مفتاح ، قيمة) في mapper_result
                                    إذا self.check_position (مفتاح ، reduer_index)]
                    ، temp_map_file)
        temp_map_file.close ()
    
def run_reducer (self، index):
    "" "يشغل المخفض المطبق
    : فهرس المعلمة: مؤشر الخيط لتشغيل
    "" "
    key_values_map = {}
    بالنسبة إلى mapper_index في النطاق (self.n_mappers):
        temp_map_file = open (settings.get_temp_map_file (mapper_index ، الفهرس) ، "r")
        mapper_results = json.load (temp_map_file)
        لـ (مفتاح ، قيمة) في mapper_results:
            إن لم يكن (المفتاح في key_values_map):
                key_values_map [key] = []
            محاولة:
                key_values_map [مفتاح] .append (القيمة)
            باستثناء استثناء ، ه:
                طباعة "استثناء أثناء إدخال المفتاح:" + str (e)
        temp_map_file.close ()
        إذا الذات.نظافة:
            os.unlink (settings.get_temp_map_file (mapper_index ، الفهرس))
    key_value_list = []
    للمفتاح في key_values_map:
        key_value_list.append (self.reducer (key، key_values_map [key]))
    output_file = open (settings.get_output_file (index) ، "w +")
    json.dump (key_value_list ، output_file)
    output_file.close ()

وأخيرًا ، قمنا بتعديل طريقة التشغيل قليلاً لتمكين المستخدم من تحديد ما إذا كان يجب الانضمام إلى المخرجات أم لا. طريقة التشغيل تصبح:

def def (self، join = False):
    "" "ينفذ الخريطة ويقلل العمليات
    : param join: صحيح إذا كنا بحاجة إلى الانضمام إلى المخرجات ، False بشكل افتراضي.
    "" "
    # تهيئة قائمة المصممين
    map_workers = []
    # تهيئة قائمة المخفضات
    rdc_workers = []
    # قم بتشغيل خطوة الخريطة
    لـ thread_id في النطاق (self.n_mappers):
        p = العملية (الهدف = self.run_mapper ، args = (thread_id ،))
        p.start ()
        map_workers.append (ع)
    [t.join () لـ t في map_workers]
    # تشغيل الخطوة الحد
    لـ thread_id في النطاق (self.n_reducers):
        p = العملية (الهدف = self.run_reducer ، args = (thread_id ،))
        p.start ()
        map_workers.append (ع)
    [t.join () لـ t في rdc_workers]
    إذا الانضمام:
        self.join_outputs ()

الرمز النهائي موجود في مستودع github MapCakes على الرابط التالي: https://github.com/nidhog/mapcakes

وحدة "الإعدادات"

تحتوي هذه الوحدة على الإعدادات الافتراضية ووظائف الأداة المساعدة لإنشاء أسماء المسارات لملفات الإدخال والإخراج والملفات المؤقتة. يتم وصف طرق المساعدة هذه في تعليقات مقتطف الشفرة أدناه:

# تعيين الدليل الافتراضي لملفات الإدخال
default_input_dir = "input_files"
# تعيين الدليل الافتراضي لملفات الخريطة المؤقتة
default_map_dir = "temp_map_files"
# تعيين الدليل الافتراضي لملفات الإخراج
default_output_dir = "output_files"
# تعيين الرقم الافتراضي للخريطة وتقليل المواضيع
default_n_mappers = 4
التخفيضات الافتراضية = 4
# إرجاع اسم ملف الإدخال لتقسيمه إلى أجزاء
def get_input_file (input_dir = None ، extension = ".ext"):
    إن لم يكن (input_dir is None):
        إرجاع input_dir + "/ ملف" + ملحق
    إرجاع default_input_dir + "/ file" + ملحق
    
    
# إرجاع اسم ملف الانقسام الحالي المطابق للفهرس المحدد
def get_input_split_file (index، input_dir = None، extension = ".ext"):
    إن لم يكن (input_dir is None):
        إرجاع input_dir + "/ file _" + str (index) + extension
    إرجاع default_input_dir + "/ file_" + str (index) + extension
        
        
# ارجع اسم ملف الخريطة المؤقت المطابق للفهرس المحدد
def get_temp_map_file (الفهرس ، المخفض ، output_dir = None ، extension = ".ext"):
    إن لم يكن (output_dir is None):
        إرجاع الإخراج_dir + "/ map_file_" + str (الفهرس) + "-" + str (المخفض) + الامتداد
    إرجاع default_output_dir + "/ map_file_" + str (الفهرس) + "-" + str (المخفض) + الامتداد
# إرجاع اسم ملف الإخراج بالنظر إلى الفهرس المقابل
def get_output_file (index، output_dir = None، extension = ".out"):
    إن لم يكن (output_dir is None):
        قم بإرجاع output_dir + "/ redu_file _" + str (index) + extension
    إرجاع default_output_dir + "/ redu_file_" + str (index) + extension
        
# إرجاع اسم ملف الإخراج
def get_output_join_file (output_dir = بلا ، ملحق = ".out"):
    إن لم يكن (output_dir is None):
        return output_dir + "/ output" + extension
    إرجاع default_output_dir + "/ output" + ملحق

مثال على عدد الكلمات

في هذا المثال ، نفترض أن لدينا مستندًا ونريد حساب عدد مرات تواجد كل كلمة في المستند. للقيام بذلك ، نحتاج إلى تحديد خريطتنا وتقليص العمليات حتى نتمكن من تنفيذ أساليب معين ومخفض من فئة MapReduce. الحل لعدد الكلمات بسيط جداً:

  • map: نقسم النص ، نأخذ الكلمات التي تحتوي على أحرف ascii فقط ونصغر الكلمات. بعد ذلك ، نرسل كل كلمة كمفتاح مع عدد 1.
  • تقليل: نحن ببساطة جمع كل القيم السابقة لكل كلمة.

وبالتالي ، فإننا ننفذ فئة MapReduce على النحو التالي:

من mapreduce استيراد MapReduce
استيراد أنظمة
الفئة WordCount (MapReduce):
    def __init __ (self، input_dir، output_dir، n_mappers، n_reducers):
        MapReduce .__ init __ (self، input_dir، output_dir، n_mappers، n_reducers)
    def mapper (self، key، value):
        "" "خريطة وظيفة لمثال عدد الكلمات
        ملاحظة: يجب فصل كل سطر إلى كلمات وكل كلمة
        يحتاج إلى تحويل إلى حالة صغيرة.
        "" "
        النتائج = []
        default_count = 1
        # خط منفصل في الكلمات
        للكلمة في value.split ():
            إذا self.is_valid_word (كلمة):
                # كلمات صغيرة

هذا كل ما لدي أيها الناس! :د

شكرا للقراءة! :) إذا استمتعت به ، اضغط على زر القلب أدناه. يعني الكثير بالنسبة لي ويساعد الآخرين على رؤية القصة.

أحب بيثون؟ إليك برنامج تعليمي حول كيفية إنشاء chatbot لفترة قصيرة في أقل من ساعة.