|
|
|
"""CollectParaphrizingData.ipynb |
|
""" |
|
|
|
from seamless_communication.models.inference import Translator |
|
|
|
import pickle |
|
from tqdm import tqdm |
|
|
|
import os |
|
import torch |
|
import urllib.request |
|
import zipfile |
|
import xml.etree.ElementTree as ET |
|
|
|
import time |
|
import re |
|
from pyarabic.araby import normalize_hamza, strip_tatweel, strip_tashkeel |
|
|
|
|
|
from ghalatawi.autocorrector import AutoCorrector |
|
from ghalatawi.ar_ghalat import isArabicword |
|
from ghalatawi.ghalat_const import ReplacementTablePount |
|
import naftawayh.wordtag as wordtag |
|
import hunspell |
|
|
|
|
|
zip_file_path = 'ar.zip' |
|
unzip_folder = 'OpenSubtitles' |
|
|
|
if not os.path.exists(unzip_folder): |
|
if not os.path.exists(zip_file_path): |
|
print("Downloading...") |
|
urllib.request.urlretrieve('https://object.pouta.csc.fi/OPUS-OpenSubtitles/v2018/raw/ar.zip', zip_file_path) |
|
|
|
print("Unzipping...") |
|
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: |
|
zip_ref.extractall(unzip_folder) |
|
print("Unzipped.") |
|
else: |
|
print(f"{unzip_folder} already exists. Skipping download and unzip steps.") |
|
|
|
|
|
def get_arabic_sentences_from_xml(xml_file_path): |
|
tree = ET.parse(xml_file_path) |
|
root = tree.getroot() |
|
sentences = [] |
|
for sentence in root.iter('s'): |
|
arabic_text = "".join(sentence.itertext()).strip() |
|
if arabic_text: |
|
sentences.append(arabic_text) |
|
return sentences |
|
|
|
|
|
|
|
translator = Translator("seamlessM4T_large", vocoder_name_or_card="vocoder_36langs", device=torch.device("cpu"), dtype=torch.float64) |
|
|
|
def generate_paraphrizing_texts(text): |
|
data = [] |
|
for lang_code in ['eng', 'fra', 'cmn','spa', 'rus']: |
|
|
|
translated_text, _, _ = translator.predict(text, 't2tt', lang_code, src_lang='arb') |
|
|
|
retranslated_text, _, _ = translator.predict(translated_text, 't2tt', 'arb', src_lang=lang_code) |
|
data.append(retranslated_text) |
|
return data |
|
|
|
|
|
|
|
|
|
|
|
autoco = AutoCorrector() |
|
tagger = wordtag.WordTagger(); |
|
hobj = hunspell.HunSpell('/usr/share/hunspell/ar.dic', '/usr/share/hunspell/ar.aff') |
|
|
|
def normalize_text(text): |
|
|
|
arabic_text_pattern = r'[^ء-ي0-9،؛؟.!\s]' |
|
cleaned_text = re.sub(arabic_text_pattern, '', text) |
|
cleaned_text = cleaned_text.replace('...', '').replace('..', '') |
|
return normalize_hamza(strip_tatweel(strip_tashkeel(cleaned_text)), method="tasheel") |
|
|
|
def contains_arabic(text): |
|
pattern = r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF]' |
|
if re.search(pattern, text): |
|
return True |
|
return False |
|
|
|
def spell_ghalatawi(text): |
|
return autoco.spell(text) |
|
|
|
def spell_hunspell(text): |
|
words = text.split() |
|
|
|
corrected_words = [] |
|
|
|
for word in words: |
|
|
|
if any(pattern.search(word) for (pattern,replacment) in ReplacementTablePount): |
|
corrected_words.append(word) |
|
continue |
|
|
|
|
|
if isArabicword(word) and not hobj.spell(word) and hobj.suggest(word): |
|
corrected_words.append(hobj.suggest(word)[0]) |
|
else: |
|
corrected_words.append(word) |
|
|
|
return ' '.join(corrected_words) |
|
|
|
def print_html(text): |
|
print(text) |
|
|
|
|
|
|
|
if os.path.exists('completed_folders.pkl'): |
|
with open('completed_folders.pkl', 'rb') as f: |
|
completed_folders = pickle.load(f) |
|
else: |
|
completed_folders = [] |
|
|
|
file_count=0 |
|
|
|
|
|
def load_completed_folders(file_path): |
|
if os.path.exists(file_path): |
|
with open(file_path, 'rb') as f: |
|
return pickle.load(f) |
|
return [] |
|
|
|
|
|
buffered_data = [] |
|
|
|
|
|
BUFFER_SIZE = 100 |
|
|
|
base_folder = 'OpenSubtitles/OpenSubtitles/raw/ar' |
|
completed_folders = load_completed_folders('completed_folders.pkl') |
|
|
|
year_folders = [f for f in os.listdir(base_folder) if os.path.isdir(os.path.join(base_folder, f))] |
|
|
|
|
|
for index, year_folder in enumerate(year_folders, start=1): |
|
year_path = os.path.join(base_folder, year_folder) |
|
print(f"Processing year folder {index}/{len(year_folders)}: {year_folder}") |
|
|
|
subfolders_list = [sf for sf in os.listdir(year_path) if os.path.isdir(os.path.join(year_path, sf))] |
|
|
|
|
|
for sub_folder in tqdm(subfolders_list, desc="Subfolders", ncols=100): |
|
if sub_folder in completed_folders: |
|
continue |
|
|
|
sub_folder_path = os.path.join(year_path, sub_folder) |
|
|
|
for xml_file in os.listdir(sub_folder_path): |
|
if xml_file.endswith('.xml'): |
|
xml_file_path = os.path.join(sub_folder_path, xml_file) |
|
|
|
arabic_sentences = get_arabic_sentences_from_xml(xml_file_path) |
|
arabic_sentences = arabic_sentences[3:] |
|
|
|
for sentence in arabic_sentences: |
|
if len(sentence) < 5 or not contains_arabic(sentence): |
|
continue |
|
|
|
paraphrased_texts = [] |
|
for paraphrasing_object in generate_paraphrizing_texts(normalize_text(sentence)): |
|
try: |
|
paraphrasing_string = paraphrasing_object.bytes().decode('utf-8') |
|
|
|
if paraphrasing_string in paraphrased_texts: |
|
continue |
|
paraphrased_texts.append(paraphrasing_string) |
|
|
|
ghalatawi_spell = spell_ghalatawi(paraphrasing_string) |
|
if paraphrasing_string != ghalatawi_spell: |
|
paraphrasing_string = ghalatawi_spell |
|
|
|
hunspell_spell = spell_hunspell(paraphrasing_string) |
|
if paraphrasing_string != hunspell_spell: |
|
paraphrasing_string = hunspell_spell |
|
|
|
buffered_data.append(f"{sentence}\t\t{paraphrasing_string}\n") |
|
|
|
|
|
if len(buffered_data) >= BUFFER_SIZE: |
|
with open("open_subtitle_para_en_.txt", "a") as f: |
|
f.write("".join(buffered_data)) |
|
buffered_data.clear() |
|
|
|
except: |
|
print("\nAn exception occurred") |
|
|
|
|
|
completed_folders.append(sub_folder) |
|
print(f"\nCompleted subfolder: {sub_folder}") |
|
with open('completed_folders.pkl', 'wb') as f: |
|
pickle.dump(completed_folders, f) |
|
|
|
|
|
if buffered_data: |
|
with open("open_subtitle_para_ar_.txt", "a") as f: |
|
f.write("".join(buffered_data)) |