Joblib
quality_classifier_pl / main_parquet.py
adgw's picture
fix
aa2da0e verified
# -*- coding: utf-8 -*-
"""
Skrypt do masowego przetwarzania plików parquet w celu klasyfikacji jakości tekstu.
Ten moduł jest przeznaczony do wydajnej analizy dużych zbiorów danych.
Skanuje folder wejściowy w poszukiwaniu plików .parquet, przetwarza każdy z nich
równolegle z użyciem wielu procesów (`multiprocessing`), a następnie zapisuje
wyniki do nowego pliku w folderze wyjściowym, zachowując oryginalną strukturę
danych i dodając wyniki klasyfikacji.
"""
import os
import glob
import time
import pickle
import joblib
import pandas as pd
import json
import numpy as np
from tqdm import tqdm
from typing import List
from text_analyzer.analyzer import TextAnalyzer
from text_analyzer import constants
# --- Ładowanie modeli i konfiguracja ---
with open('models/scaler.pkl', 'rb') as f:
scaler = pickle.load(f)
classifier = joblib.load("models/model.joblib")
text_analyzer = TextAnalyzer()
batch_size = 10
class NumpyJSONEncoder(json.JSONEncoder):
"""
Specjalny enkoder JSON do obsługi typów danych z NumPy,
które nie są domyślnie serializowalne.
"""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
return super(NumpyJSONEncoder, self).default(obj)
# --- Definicje funkcji ---
def predict_batch(texts: List[str], analyzer: TextAnalyzer, scaler_model, classifier_model) -> List[tuple[str | None, float | None]]:
"""
Przetwarza całą listę tekstów wsadowo i zwraca listę predykcji.
"""
all_features = []
# Krok 1: Ekstrakcja cech dla wszystkich tekstów
feature_generator = analyzer.analyze_batch(texts, batch_size=batch_size)
for features_dict in tqdm(feature_generator, total=len(texts), desc="Analiza cech"):
ordered_features = [features_dict.get(fname, 0.0) for fname in constants.COLUMN_ORDER]
all_features.append(ordered_features)
if not all_features:
return []
# Krok 2: Przygotowanie i skalowanie wszystkich wektorów naraz
features_df = pd.DataFrame(all_features, columns=constants.COLUMN_ORDER)
features_scaled = scaler_model.transform(features_df)
# Krok 3: Predykcja dla całej paczki
pred_probas = classifier_model.predict_proba(features_scaled)
# Krok 4: Przetworzenie wyników
results = []
labels = ["LOW", "MEDIUM", "HIGH"]
for single_pred_proba in pred_probas:
category_prob = {
label: prob
for label, prob in zip(labels, single_pred_proba)
}
# Sortujemy, aby znaleźć kategorię z najwyższym prawdopodobieństwem
sorted_category_prob = sorted(category_prob.items(), key=lambda item: item[1], reverse=True)
most_probable_category, confidence = sorted_category_prob[0]
results.append((most_probable_category, round(float(confidence) * 100, 2)))
return results
def process_parquet_file(input_file: str, output_file: str):
"""
Orkiestruje proces przetwarzania pojedynczego pliku .parquet wsadowo.
Wczytuje plik, przetwarza kolumnę 'text', a następnie dopisuje
wynikowe kolumny 'quality_ai' i 'confidence' do nowego pliku Parquet.
"""
try:
# Krok 1: Wczytaj cały plik Parquet do ramki danych pandas
df = pd.read_parquet(input_file)
except Exception as e:
print(f"Nie udało się wczytać pliku {input_file}. Błąd: {e}")
return
# Sprawdzenie, czy kolumna 'text' istnieje
if 'text' not in df.columns:
print(f"Błąd: W pliku {input_file} brakuje wymaganej kolumny 'text'.")
return
# Krok 2: Przygotuj dane do przetwarzania wsadowego
texts_to_process = df['text'].tolist()
print(f"Wczytano {len(texts_to_process)} wierszy. Rozpoczynam przetwarzanie wsadowe...")
# Krok 3: Wywołaj funkcję wsadową (ta część pozostaje bez zmian)
# Zakładamy, że predict_batch zwraca listę tuple: [(kategoria, pewność), ...]
results = predict_batch(texts_to_process, text_analyzer, scaler, classifier)
# Krok 4: Dodaj wyniki jako nowe kolumny do ramki danych
categories = [res[0] for res in results]
confidences = [res[1] for res in results]
df['quality_ai'] = categories
df['confidence'] = confidences
# Krok 5: Zapisz zmodyfikowaną ramkę danych do nowego pliku Parquet
try:
df.to_parquet(output_file, index=False)
print(df.head(10))
print(f"Pomyślnie zapisano przetworzone dane do pliku {output_file}")
except Exception as e:
print(f"Nie udało się zapisać pliku {output_file}. Błąd: {e}")
# --- Główny blok wykonawczy ---
if __name__ == '__main__':
print("Inicjalizacja skryptu przetwarzania wsadowego...")
INPUT_FOLDER = 'input_parquet'
OUTPUT_FOLDER = 'output'
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
# Skanowanie plików
parquet_files = glob.glob(os.path.join(INPUT_FOLDER, '*.parquet'))
for file_path in parquet_files:
start_time = time.time()
output_file = os.path.join(OUTPUT_FOLDER, os.path.basename(file_path))
if os.path.exists(output_file):
print(f"POMIJAM - plik już istnieje: {output_file}")
continue
print(f"\n--- Przetwarzanie pliku: {file_path} ---")
process_parquet_file(file_path, output_file)
end_time = time.time()
print(f"Processing time: {end_time - start_time:.4f} seconds")
print("\nWszystkie pliki zostały przetworzone!")
Free AI Image Generator No sign-up. Instant results. Open Now