File size: 7,181 Bytes
42d9280 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# -*- coding: utf-8 -*-
"""CollectParaphrizingData.ipynb
"""
from seamless_communication.models.inference import Translator
import pickle
from tqdm import tqdm
import os
import torch
import urllib.request
import zipfile
import xml.etree.ElementTree as ET
import time
import re
from pyarabic.araby import normalize_hamza, strip_tatweel, strip_tashkeel
#from IPython.core.display import display, HTML
from ghalatawi.autocorrector import AutoCorrector
from ghalatawi.ar_ghalat import isArabicword
from ghalatawi.ghalat_const import ReplacementTablePount
import naftawayh.wordtag as wordtag
import hunspell
# Download and unzip the file only if it doesn't exist
zip_file_path = 'ar.zip'
unzip_folder = 'OpenSubtitles'
if not os.path.exists(unzip_folder):
if not os.path.exists(zip_file_path):
print("Downloading...")
urllib.request.urlretrieve('https://object.pouta.csc.fi/OPUS-OpenSubtitles/v2018/raw/ar.zip', zip_file_path)
print("Unzipping...")
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall(unzip_folder)
print("Unzipped.")
else:
print(f"{unzip_folder} already exists. Skipping download and unzip steps.")
# Function to extract Arabic sentences from an XML file
def get_arabic_sentences_from_xml(xml_file_path):
tree = ET.parse(xml_file_path)
root = tree.getroot()
sentences = []
for sentence in root.iter('s'):
arabic_text = "".join(sentence.itertext()).strip()
if arabic_text:
sentences.append(arabic_text)
return sentences
translator = Translator("seamlessM4T_large", vocoder_name_or_card="vocoder_36langs", device=torch.device("cpu"), dtype=torch.float64)
def generate_paraphrizing_texts(text):
data = []
for lang_code in ['eng', 'fra', 'cmn','spa', 'rus']:
# ترجمة النص إلى لغة أخرى
translated_text, _, _ = translator.predict(text, 't2tt', lang_code, src_lang='arb')
# إعادة ترجمة النص إلى العربية
retranslated_text, _, _ = translator.predict(translated_text, 't2tt', 'arb', src_lang=lang_code)
data.append(retranslated_text)
return data
autoco = AutoCorrector()
tagger = wordtag.WordTagger();
hobj = hunspell.HunSpell('/usr/share/hunspell/ar.dic', '/usr/share/hunspell/ar.aff')
def normalize_text(text):
# الحفاظ على الأحرف الأبجدية العربية، الأرقام، المسافات، وبعض علامات الترقيم المعترف بها في اللغة العربية
arabic_text_pattern = r'[^ء-ي0-9،؛؟.!\s]'
cleaned_text = re.sub(arabic_text_pattern, '', text)
cleaned_text = cleaned_text.replace('...', '').replace('..', '') # remove shortcut sympole
return normalize_hamza(strip_tatweel(strip_tashkeel(cleaned_text)), method="tasheel")
def contains_arabic(text):
pattern = r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF]'
if re.search(pattern, text):
return True
return False
def spell_ghalatawi(text):
return autoco.spell(text)
def spell_hunspell(text):
words = text.split()
corrected_words = []
for word in words:
# Check if the word matches any of the regular expressions in the list
if any(pattern.search(word) for (pattern,replacment) in ReplacementTablePount):
corrected_words.append(word)
continue
# Perform the original spell checking if the word doesn't match any patterns
if isArabicword(word) and not hobj.spell(word) and hobj.suggest(word):
corrected_words.append(hobj.suggest(word)[0])
else:
corrected_words.append(word)
return ' '.join(corrected_words)
def print_html(text):
print(text)
if os.path.exists('completed_folders.pkl'):
with open('completed_folders.pkl', 'rb') as f:
completed_folders = pickle.load(f)
else:
completed_folders = []
file_count=0
def load_completed_folders(file_path):
if os.path.exists(file_path):
with open(file_path, 'rb') as f:
return pickle.load(f)
return []
# Initialize the buffer
buffered_data = []
# Set buffer size
BUFFER_SIZE = 100
base_folder = 'OpenSubtitles/OpenSubtitles/raw/ar'
completed_folders = load_completed_folders('completed_folders.pkl')
year_folders = [f for f in os.listdir(base_folder) if os.path.isdir(os.path.join(base_folder, f))]
# Loop with counter for year folders
for index, year_folder in enumerate(year_folders, start=1):
year_path = os.path.join(base_folder, year_folder)
print(f"Processing year folder {index}/{len(year_folders)}: {year_folder}")
subfolders_list = [sf for sf in os.listdir(year_path) if os.path.isdir(os.path.join(year_path, sf))]
# tqdm progress bar for subfolders
for sub_folder in tqdm(subfolders_list, desc="Subfolders", ncols=100):
if sub_folder in completed_folders:
continue
sub_folder_path = os.path.join(year_path, sub_folder)
for xml_file in os.listdir(sub_folder_path):
if xml_file.endswith('.xml'):
xml_file_path = os.path.join(sub_folder_path, xml_file)
arabic_sentences = get_arabic_sentences_from_xml(xml_file_path)
arabic_sentences = arabic_sentences[3:] # Skip introduction
for sentence in arabic_sentences:
if len(sentence) < 5 or not contains_arabic(sentence):
continue
paraphrased_texts = []
for paraphrasing_object in generate_paraphrizing_texts(normalize_text(sentence)):
try:
paraphrasing_string = paraphrasing_object.bytes().decode('utf-8')
if paraphrasing_string in paraphrased_texts:
continue
paraphrased_texts.append(paraphrasing_string)
ghalatawi_spell = spell_ghalatawi(paraphrasing_string)
if paraphrasing_string != ghalatawi_spell:
paraphrasing_string = ghalatawi_spell
hunspell_spell = spell_hunspell(paraphrasing_string)
if paraphrasing_string != hunspell_spell:
paraphrasing_string = hunspell_spell
buffered_data.append(f"{sentence}\t\t{paraphrasing_string}\n")
if len(buffered_data) >= BUFFER_SIZE:
with open("open_subtitle_para_en_.txt", "a") as f:
f.write("".join(buffered_data))
buffered_data.clear()
except:
print("\nAn exception occurred")
completed_folders.append(sub_folder)
print(f"\nCompleted subfolder: {sub_folder}")
with open('completed_folders.pkl', 'wb') as f:
pickle.dump(completed_folders, f)
# Write remaining buffered data to disk if any
if buffered_data:
with open("open_subtitle_para_ar_.txt", "a") as f:
f.write("".join(buffered_data)) |