# -*- coding: utf-8 -*- """CollectParaphrizingData.ipynb """ from seamless_communication.models.inference import Translator import pickle from tqdm import tqdm import os import torch import urllib.request import zipfile import xml.etree.ElementTree as ET import time import re from pyarabic.araby import normalize_hamza, strip_tatweel, strip_tashkeel #from IPython.core.display import display, HTML from ghalatawi.autocorrector import AutoCorrector from ghalatawi.ar_ghalat import isArabicword from ghalatawi.ghalat_const import ReplacementTablePount import naftawayh.wordtag as wordtag import hunspell # Download and unzip the file only if it doesn't exist zip_file_path = 'ar.zip' unzip_folder = 'OpenSubtitles' if not os.path.exists(unzip_folder): if not os.path.exists(zip_file_path): print("Downloading...") urllib.request.urlretrieve('https://object.pouta.csc.fi/OPUS-OpenSubtitles/v2018/raw/ar.zip', zip_file_path) print("Unzipping...") with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: zip_ref.extractall(unzip_folder) print("Unzipped.") else: print(f"{unzip_folder} already exists. Skipping download and unzip steps.") # Function to extract Arabic sentences from an XML file def get_arabic_sentences_from_xml(xml_file_path): tree = ET.parse(xml_file_path) root = tree.getroot() sentences = [] for sentence in root.iter('s'): arabic_text = "".join(sentence.itertext()).strip() if arabic_text: sentences.append(arabic_text) return sentences translator = Translator("seamlessM4T_large", vocoder_name_or_card="vocoder_36langs", device=torch.device("cpu"), dtype=torch.float64) def generate_paraphrizing_texts(text): data = [] for lang_code in ['eng', 'fra', 'cmn','spa', 'rus']: # ترجمة النص إلى لغة أخرى translated_text, _, _ = translator.predict(text, 't2tt', lang_code, src_lang='arb') # إعادة ترجمة النص إلى العربية retranslated_text, _, _ = translator.predict(translated_text, 't2tt', 'arb', src_lang=lang_code) data.append(retranslated_text) return data autoco = AutoCorrector() tagger = wordtag.WordTagger(); hobj = hunspell.HunSpell('/usr/share/hunspell/ar.dic', '/usr/share/hunspell/ar.aff') def normalize_text(text): # الحفاظ على الأحرف الأبجدية العربية، الأرقام، المسافات، وبعض علامات الترقيم المعترف بها في اللغة العربية arabic_text_pattern = r'[^ء-ي0-9،؛؟.!\s]' cleaned_text = re.sub(arabic_text_pattern, '', text) cleaned_text = cleaned_text.replace('...', '').replace('..', '') # remove shortcut sympole return normalize_hamza(strip_tatweel(strip_tashkeel(cleaned_text)), method="tasheel") def contains_arabic(text): pattern = r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF]' if re.search(pattern, text): return True return False def spell_ghalatawi(text): return autoco.spell(text) def spell_hunspell(text): words = text.split() corrected_words = [] for word in words: # Check if the word matches any of the regular expressions in the list if any(pattern.search(word) for (pattern,replacment) in ReplacementTablePount): corrected_words.append(word) continue # Perform the original spell checking if the word doesn't match any patterns if isArabicword(word) and not hobj.spell(word) and hobj.suggest(word): corrected_words.append(hobj.suggest(word)[0]) else: corrected_words.append(word) return ' '.join(corrected_words) def print_html(text): print(text) if os.path.exists('completed_folders.pkl'): with open('completed_folders.pkl', 'rb') as f: completed_folders = pickle.load(f) else: completed_folders = [] file_count=0 def load_completed_folders(file_path): if os.path.exists(file_path): with open(file_path, 'rb') as f: return pickle.load(f) return [] # Initialize the buffer buffered_data = [] # Set buffer size BUFFER_SIZE = 100 base_folder = 'OpenSubtitles/OpenSubtitles/raw/ar' completed_folders = load_completed_folders('completed_folders.pkl') year_folders = [f for f in os.listdir(base_folder) if os.path.isdir(os.path.join(base_folder, f))] # Loop with counter for year folders for index, year_folder in enumerate(year_folders, start=1): year_path = os.path.join(base_folder, year_folder) print(f"Processing year folder {index}/{len(year_folders)}: {year_folder}") subfolders_list = [sf for sf in os.listdir(year_path) if os.path.isdir(os.path.join(year_path, sf))] # tqdm progress bar for subfolders for sub_folder in tqdm(subfolders_list, desc="Subfolders", ncols=100): if sub_folder in completed_folders: continue sub_folder_path = os.path.join(year_path, sub_folder) for xml_file in os.listdir(sub_folder_path): if xml_file.endswith('.xml'): xml_file_path = os.path.join(sub_folder_path, xml_file) arabic_sentences = get_arabic_sentences_from_xml(xml_file_path) arabic_sentences = arabic_sentences[3:] # Skip introduction for sentence in arabic_sentences: if len(sentence) < 5 or not contains_arabic(sentence): continue paraphrased_texts = [] for paraphrasing_object in generate_paraphrizing_texts(normalize_text(sentence)): try: paraphrasing_string = paraphrasing_object.bytes().decode('utf-8') if paraphrasing_string in paraphrased_texts: continue paraphrased_texts.append(paraphrasing_string) ghalatawi_spell = spell_ghalatawi(paraphrasing_string) if paraphrasing_string != ghalatawi_spell: paraphrasing_string = ghalatawi_spell hunspell_spell = spell_hunspell(paraphrasing_string) if paraphrasing_string != hunspell_spell: paraphrasing_string = hunspell_spell buffered_data.append(f"{sentence}\t\t{paraphrasing_string}\n") if len(buffered_data) >= BUFFER_SIZE: with open("open_subtitle_para_en_.txt", "a") as f: f.write("".join(buffered_data)) buffered_data.clear() except: print("\nAn exception occurred") completed_folders.append(sub_folder) print(f"\nCompleted subfolder: {sub_folder}") with open('completed_folders.pkl', 'wb') as f: pickle.dump(completed_folders, f) # Write remaining buffered data to disk if any if buffered_data: with open("open_subtitle_para_ar_.txt", "a") as f: f.write("".join(buffered_data))