jais-finetuned-v1 / generate_dataset.py
3okasha's picture
Create generate_dataset.py
42d9280 verified
# -*- coding: utf-8 -*-
"""CollectParaphrizingData.ipynb
"""
from seamless_communication.models.inference import Translator
import pickle
from tqdm import tqdm
import os
import torch
import urllib.request
import zipfile
import xml.etree.ElementTree as ET
import time
import re
from pyarabic.araby import normalize_hamza, strip_tatweel, strip_tashkeel
#from IPython.core.display import display, HTML
from ghalatawi.autocorrector import AutoCorrector
from ghalatawi.ar_ghalat import isArabicword
from ghalatawi.ghalat_const import ReplacementTablePount
import naftawayh.wordtag as wordtag
import hunspell
# Download and unzip the file only if it doesn't exist
zip_file_path = 'ar.zip'
unzip_folder = 'OpenSubtitles'
if not os.path.exists(unzip_folder):
if not os.path.exists(zip_file_path):
print("Downloading...")
urllib.request.urlretrieve('https://object.pouta.csc.fi/OPUS-OpenSubtitles/v2018/raw/ar.zip', zip_file_path)
print("Unzipping...")
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall(unzip_folder)
print("Unzipped.")
else:
print(f"{unzip_folder} already exists. Skipping download and unzip steps.")
# Function to extract Arabic sentences from an XML file
def get_arabic_sentences_from_xml(xml_file_path):
tree = ET.parse(xml_file_path)
root = tree.getroot()
sentences = []
for sentence in root.iter('s'):
arabic_text = "".join(sentence.itertext()).strip()
if arabic_text:
sentences.append(arabic_text)
return sentences
translator = Translator("seamlessM4T_large", vocoder_name_or_card="vocoder_36langs", device=torch.device("cpu"), dtype=torch.float64)
def generate_paraphrizing_texts(text):
data = []
for lang_code in ['eng', 'fra', 'cmn','spa', 'rus']:
# ترجمة النص إلى لغة أخرى
translated_text, _, _ = translator.predict(text, 't2tt', lang_code, src_lang='arb')
# إعادة ترجمة النص إلى العربية
retranslated_text, _, _ = translator.predict(translated_text, 't2tt', 'arb', src_lang=lang_code)
data.append(retranslated_text)
return data
autoco = AutoCorrector()
tagger = wordtag.WordTagger();
hobj = hunspell.HunSpell('/usr/share/hunspell/ar.dic', '/usr/share/hunspell/ar.aff')
def normalize_text(text):
# الحفاظ على الأحرف الأبجدية العربية، الأرقام، المسافات، وبعض علامات الترقيم المعترف بها في اللغة العربية
arabic_text_pattern = r'[^ء-ي0-9،؛؟.!\s]'
cleaned_text = re.sub(arabic_text_pattern, '', text)
cleaned_text = cleaned_text.replace('...', '').replace('..', '') # remove shortcut sympole
return normalize_hamza(strip_tatweel(strip_tashkeel(cleaned_text)), method="tasheel")
def contains_arabic(text):
pattern = r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF]'
if re.search(pattern, text):
return True
return False
def spell_ghalatawi(text):
return autoco.spell(text)
def spell_hunspell(text):
words = text.split()
corrected_words = []
for word in words:
# Check if the word matches any of the regular expressions in the list
if any(pattern.search(word) for (pattern,replacment) in ReplacementTablePount):
corrected_words.append(word)
continue
# Perform the original spell checking if the word doesn't match any patterns
if isArabicword(word) and not hobj.spell(word) and hobj.suggest(word):
corrected_words.append(hobj.suggest(word)[0])
else:
corrected_words.append(word)
return ' '.join(corrected_words)
def print_html(text):
print(text)
if os.path.exists('completed_folders.pkl'):
with open('completed_folders.pkl', 'rb') as f:
completed_folders = pickle.load(f)
else:
completed_folders = []
file_count=0
def load_completed_folders(file_path):
if os.path.exists(file_path):
with open(file_path, 'rb') as f:
return pickle.load(f)
return []
# Initialize the buffer
buffered_data = []
# Set buffer size
BUFFER_SIZE = 100
base_folder = 'OpenSubtitles/OpenSubtitles/raw/ar'
completed_folders = load_completed_folders('completed_folders.pkl')
year_folders = [f for f in os.listdir(base_folder) if os.path.isdir(os.path.join(base_folder, f))]
# Loop with counter for year folders
for index, year_folder in enumerate(year_folders, start=1):
year_path = os.path.join(base_folder, year_folder)
print(f"Processing year folder {index}/{len(year_folders)}: {year_folder}")
subfolders_list = [sf for sf in os.listdir(year_path) if os.path.isdir(os.path.join(year_path, sf))]
# tqdm progress bar for subfolders
for sub_folder in tqdm(subfolders_list, desc="Subfolders", ncols=100):
if sub_folder in completed_folders:
continue
sub_folder_path = os.path.join(year_path, sub_folder)
for xml_file in os.listdir(sub_folder_path):
if xml_file.endswith('.xml'):
xml_file_path = os.path.join(sub_folder_path, xml_file)
arabic_sentences = get_arabic_sentences_from_xml(xml_file_path)
arabic_sentences = arabic_sentences[3:] # Skip introduction
for sentence in arabic_sentences:
if len(sentence) < 5 or not contains_arabic(sentence):
continue
paraphrased_texts = []
for paraphrasing_object in generate_paraphrizing_texts(normalize_text(sentence)):
try:
paraphrasing_string = paraphrasing_object.bytes().decode('utf-8')
if paraphrasing_string in paraphrased_texts:
continue
paraphrased_texts.append(paraphrasing_string)
ghalatawi_spell = spell_ghalatawi(paraphrasing_string)
if paraphrasing_string != ghalatawi_spell:
paraphrasing_string = ghalatawi_spell
hunspell_spell = spell_hunspell(paraphrasing_string)
if paraphrasing_string != hunspell_spell:
paraphrasing_string = hunspell_spell
buffered_data.append(f"{sentence}\t\t{paraphrasing_string}\n")
if len(buffered_data) >= BUFFER_SIZE:
with open("open_subtitle_para_en_.txt", "a") as f:
f.write("".join(buffered_data))
buffered_data.clear()
except:
print("\nAn exception occurred")
completed_folders.append(sub_folder)
print(f"\nCompleted subfolder: {sub_folder}")
with open('completed_folders.pkl', 'wb') as f:
pickle.dump(completed_folders, f)
# Write remaining buffered data to disk if any
if buffered_data:
with open("open_subtitle_para_ar_.txt", "a") as f:
f.write("".join(buffered_data))