|
|
| """
|
| Skrypt do masowego przetwarzania plików parquet w celu klasyfikacji jakości tekstu.
|
|
|
| Ten moduł jest przeznaczony do wydajnej analizy dużych zbiorów danych.
|
| Skanuje folder wejściowy w poszukiwaniu plików .parquet, przetwarza każdy z nich
|
| równolegle z użyciem wielu procesów (`multiprocessing`), a następnie zapisuje
|
| wyniki do nowego pliku w folderze wyjściowym, zachowując oryginalną strukturę
|
| danych i dodając wyniki klasyfikacji.
|
| """
|
|
|
| import os
|
| import glob
|
| import time
|
| import pickle
|
| import joblib
|
| import pandas as pd
|
| import json
|
| import numpy as np
|
| from tqdm import tqdm
|
| from typing import List
|
|
|
| from text_analyzer.analyzer import TextAnalyzer
|
| from text_analyzer import constants
|
|
|
|
|
|
|
| with open('models/scaler.pkl', 'rb') as f:
|
| scaler = pickle.load(f)
|
| classifier = joblib.load("models/model.joblib")
|
| text_analyzer = TextAnalyzer()
|
|
|
| batch_size = 10
|
|
|
| class NumpyJSONEncoder(json.JSONEncoder):
|
| """
|
| Specjalny enkoder JSON do obsługi typów danych z NumPy,
|
| które nie są domyślnie serializowalne.
|
| """
|
| def default(self, obj):
|
| if isinstance(obj, np.integer):
|
| return int(obj)
|
| if isinstance(obj, np.floating):
|
| return float(obj)
|
| if isinstance(obj, np.ndarray):
|
| return obj.tolist()
|
| return super(NumpyJSONEncoder, self).default(obj)
|
|
|
|
|
|
|
| def predict_batch(texts: List[str], analyzer: TextAnalyzer, scaler_model, classifier_model) -> List[tuple[str | None, float | None]]:
|
| """
|
| Przetwarza całą listę tekstów wsadowo i zwraca listę predykcji.
|
| """
|
| all_features = []
|
|
|
|
|
| feature_generator = analyzer.analyze_batch(texts, batch_size=batch_size)
|
| for features_dict in tqdm(feature_generator, total=len(texts), desc="Analiza cech"):
|
| ordered_features = [features_dict.get(fname, 0.0) for fname in constants.COLUMN_ORDER]
|
| all_features.append(ordered_features)
|
|
|
| if not all_features:
|
| return []
|
|
|
|
|
| features_df = pd.DataFrame(all_features, columns=constants.COLUMN_ORDER)
|
| features_scaled = scaler_model.transform(features_df)
|
|
|
|
|
| pred_probas = classifier_model.predict_proba(features_scaled)
|
|
|
|
|
| results = []
|
| labels = ["LOW", "MEDIUM", "HIGH"]
|
| for single_pred_proba in pred_probas:
|
| category_prob = {
|
| label: prob
|
| for label, prob in zip(labels, single_pred_proba)
|
| }
|
|
|
| sorted_category_prob = sorted(category_prob.items(), key=lambda item: item[1], reverse=True)
|
| most_probable_category, confidence = sorted_category_prob[0]
|
|
|
| results.append((most_probable_category, round(float(confidence) * 100, 2)))
|
|
|
| return results
|
|
|
| def process_parquet_file(input_file: str, output_file: str):
|
| """
|
| Orkiestruje proces przetwarzania pojedynczego pliku .parquet wsadowo.
|
| Wczytuje plik, przetwarza kolumnę 'text', a następnie dopisuje
|
| wynikowe kolumny 'quality_ai' i 'confidence' do nowego pliku Parquet.
|
| """
|
| try:
|
|
|
| df = pd.read_parquet(input_file)
|
| except Exception as e:
|
| print(f"Nie udało się wczytać pliku {input_file}. Błąd: {e}")
|
| return
|
|
|
|
|
| if 'text' not in df.columns:
|
| print(f"Błąd: W pliku {input_file} brakuje wymaganej kolumny 'text'.")
|
| return
|
|
|
|
|
| texts_to_process = df['text'].tolist()
|
| print(f"Wczytano {len(texts_to_process)} wierszy. Rozpoczynam przetwarzanie wsadowe...")
|
|
|
|
|
|
|
| results = predict_batch(texts_to_process, text_analyzer, scaler, classifier)
|
|
|
|
|
| categories = [res[0] for res in results]
|
| confidences = [res[1] for res in results]
|
|
|
| df['quality_ai'] = categories
|
| df['confidence'] = confidences
|
|
|
|
|
| try:
|
| df.to_parquet(output_file, index=False)
|
| print(df.head(10))
|
| print(f"Pomyślnie zapisano przetworzone dane do pliku {output_file}")
|
| except Exception as e:
|
| print(f"Nie udało się zapisać pliku {output_file}. Błąd: {e}")
|
|
|
|
|
|
|
| if __name__ == '__main__':
|
| print("Inicjalizacja skryptu przetwarzania wsadowego...")
|
|
|
| INPUT_FOLDER = 'input_parquet'
|
| OUTPUT_FOLDER = 'output'
|
| os.makedirs(OUTPUT_FOLDER, exist_ok=True)
|
|
|
|
|
| parquet_files = glob.glob(os.path.join(INPUT_FOLDER, '*.parquet'))
|
|
|
| for file_path in parquet_files:
|
| start_time = time.time()
|
| output_file = os.path.join(OUTPUT_FOLDER, os.path.basename(file_path))
|
|
|
| if os.path.exists(output_file):
|
| print(f"POMIJAM - plik już istnieje: {output_file}")
|
| continue
|
|
|
| print(f"\n--- Przetwarzanie pliku: {file_path} ---")
|
| process_parquet_file(file_path, output_file)
|
| end_time = time.time()
|
| print(f"Processing time: {end_time - start_time:.4f} seconds")
|
|
|
| print("\nWszystkie pliki zostały przetworzone!") |