发布于2024-11-23 21:39 阅读(128) 评论(0) 点赞(30) 收藏(2)
我正在研究文本分类任务,并决定为此使用 PyTorch 模型。该过程主要涉及以下步骤:
但是,我每天都需要对新的评论进行分类,并纠正任何错误的分类。
目前,我的方法是将具有正确分类的新评论添加到数据集并重新训练整个模型。这个过程很耗时,而且新评论可能会在验证过程中丢失。我想用新分类的文本创建一个新的数据集,并继续对这些新数据进行训练(新评论是手动分类的,因此每个标签都是正确的)。
使用 GPT 和一些在线代码,我编写了所需的流程,但是,我不确定它是否按预期工作,或者我犯了一些不应该发生的愚蠢错误。
因此主要问题是:
.fit_transform()
?否则我会丢失原始的矢量化器吗?以下是完整的训练过程:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.preprocessing import LabelEncoder
import polars as pl
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
import joblib
set1 = (
pl
.read_csv(
"set1.txt",
separator=";",
has_header=False,
new_columns=["text","label"]
)
)
# since the dateset its unbalanced, im going to force to have more balance
fear_df = set1.filter(pl.col("label") == "fear")
joy_df = set1.filter(pl.col("label") == "joy").sample(n=2500)
sadness_df = set1.filter(pl.col("label") == "sadness").sample(n=2500)
anger_df = set1.filter(pl.col("label") == "anger")
train_df = pl.concat([fear_df,joy_df,sadness_df,anger_df])
"""
The text its already clean, so im going to change the labels to numeric
and then split it on train, test ,val
"""
label_mapping = {
"anger": 0,
"fear": 1,
"joy": 2,
"sadness": 3
}
train_mapped = (
train_df
.with_columns(
pl.col("label").replace_strict(label_mapping, default="other").cast(pl.Int16)
)
)
train_set, pre_Test = train_test_split(train_mapped,
test_size=0.4,
random_state=42,
stratify=train_mapped["label"])
test_set, val_set = train_test_split(pre_Test,
test_size=0.5,
random_state=42,
stratify=pre_Test["label"])
# Vectorize text data using TF-IDF
vectorizer = TfidfVectorizer(max_features=30000, ngram_range=(1, 2))
X_train_tfidf = vectorizer.fit_transform(train_set['text']).toarray()
X_val_tfidf = vectorizer.transform(val_set['text']).toarray()
X_test_tfidf = vectorizer.transform(test_set['text']).toarray()
y_train = train_set['label']
y_val = val_set['label']
y_test = test_set['label']
class TextDataset(Dataset):
def __init__(self, texts, labels):
self.texts = texts
self.labels = labels
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
return text, label
train_dataset = TextDataset(X_train_tfidf, y_train)
val_dataset = TextDataset(X_val_tfidf, y_val)
test_dataset = TextDataset(X_test_tfidf, y_test)
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
class TextClassificationModel(nn.Module):
def __init__(self, input_dim, num_classes):
super(TextClassificationModel, self).__init__()
self.fc1 = nn.Linear(input_dim, 64)
self.dropout1 = nn.Dropout(0.5)
self.fc2 = nn.Linear(64, 32)
self.dropout2 = nn.Dropout(0.5)
self.fc3 = nn.Linear(32, num_classes)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.dropout1(x)
x = torch.relu(self.fc2(x))
x = self.dropout2(x)
x = torch.softmax(self.fc3(x), dim=1)
return x
input_dim = X_train_tfidf.shape[1]
model = TextClassificationModel(input_dim, 4)
# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adamax(model.parameters())
# Training loop
num_epochs = 17
best_val_acc = 0.0
best_model_path = "modelbest.pth"
for epoch in range(num_epochs):
model.train()
for texts, labels in train_loader:
texts, labels = texts.float(), labels.long()
outputs = model(texts)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Validation
model.eval()
correct, total = 0, 0
with torch.no_grad():
for texts, labels in val_loader:
texts, labels = texts.float(), labels.long()
outputs = model(texts)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
val_acc = correct / total
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save(model.state_dict(), best_model_path)
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Val Acc: {val_acc:.4f}')
# Load the best model
model.load_state_dict(torch.load(best_model_path))
# Load the best model
model.load_state_dict(torch.load(best_model_path))
# Test the model
model.eval()
correct, total = 0, 0
with torch.no_grad():
for texts, labels in test_loader:
texts, labels = texts.float(), labels.long()
outputs = model(texts)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
test_acc = correct / total
print(f'Test Acc: {test_acc:.3f}')
# Save the TF-IDF vectorizer
vectorizer_path = "tfidf_vectorizer.pkl"
joblib.dump(vectorizer, vectorizer_path)
# Save the PyTorch model
model_path = "text_classification_model.pth"
torch.save(model.state_dict(), model_path)
建议代码:
import torch
import joblib
import polars as pl
from sklearn.model_selection import train_test_split
from torch import nn
from torch.utils.data import Dataset, DataLoader
# Load the saved TF-IDF vectorizer
vectorizer_path = "tfidf_vectorizer.pkl"
vectorizer = joblib.load(vectorizer_path)
input_dim = len(vectorizer.get_feature_names_out())
class TextClassificationModel(nn.Module):
def __init__(self, input_dim, num_classes):
super(TextClassificationModel, self).__init__()
self.fc1 = nn.Linear(input_dim, 64)
self.dropout1 = nn.Dropout(0.5)
self.fc2 = nn.Linear(64, 32)
self.dropout2 = nn.Dropout(0.5)
self.fc3 = nn.Linear(32, num_classes)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.dropout1(x)
x = torch.relu(self.fc2(x))
x = self.dropout2(x)
x = torch.softmax(self.fc3(x), dim=1)
return x
# Load the saved PyTorch model
model_path = "text_classification_model.pth"
model = TextClassificationModel(input_dim, 4)
model.load_state_dict(torch.load(model_path))
# Map labels to numeric values
label_mapping = {"anger": 0, "fear": 1, "joy": 2, "sadness": 3}
sentiments = ["fear","joy","sadness","anger"]
new_data = (
pl
.read_csv(
"set2.txt",
separator=";",
has_header=False,
new_columns=["text","label"]
)
.filter(pl.col("label").is_in(sentiments))
.with_columns(
pl.col("label").replace_strict(label_mapping, default="other").cast(pl.Int16)
)
)
# Vectorize the new text data using the loaded TF-IDF vectorizer
X_new = vectorizer.transform(new_data['text']).toarray()
y_new = new_data['label']
class TextDataset(Dataset):
def __init__(self, texts, labels):
self.texts = texts
self.labels = labels
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
return text, label
batch_size = 10
# Create DataLoader for the new training data
new_train_dataset = TextDataset(X_new, y_new)
new_train_loader = DataLoader(new_train_dataset, batch_size=batch_size, shuffle=True)
# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adamax(model.parameters())
num_epochs = 5
new_best_model_path = "modelbest.pth"
for epoch in range(num_epochs):
model.train()
for texts, labels in new_train_loader:
texts, labels = texts.float(), labels.long()
outputs = model(texts)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
torch.save(model.state_dict(), new_best_model_path)
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
# Save the PyTorch model
new_best_model_path = "new_moedl.pth"
torch.save(model.state_dict(), new_best_model_path)
数据集可以在这里找到
使用预先训练的词嵌入,如 BertForSequenceClassification。这些嵌入可以更优雅地处理未见标记,因为它们根据语义将单词映射到连续向量,从而减少了未见单词的影响。
使用 BERT 进行模型训练
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertModel, BertForSequenceClassification
from transformers import Trainer, TrainingArguments
from sklearn.model_selection import train_test_split
import polars as pl
# Load and prepare data
set1 = pl.read_csv("set1.txt", separator=";", has_header=False, new_columns=["text", "label"])
# Balance dataset
fear_df = set1.filter(pl.col("label") == "fear")
joy_df = set1.filter(pl.col("label") == "joy").sample(n=2500)
sadness_df = set1.filter(pl.col("label") == "sadness").sample(n=2500)
anger_df = set1.filter(pl.col("label") == "anger")
train_df = pl.concat([fear_df, joy_df, sadness_df, anger_df])
label_mapping = {"anger": 0, "fear": 1, "joy": 2, "sadness": 3}
train_df = train_df.with_columns(pl.col("label").replace_strict(label_mapping, default="other").cast(pl.Int16))
# Split dataset
train_set, test_val_set = train_test_split(train_df, test_size=0.4, random_state=42, stratify=train_df["label"])
test_set, val_set = train_test_split(test_val_set, test_size=0.5, random_state=42, stratify=test_val_set["label"])
# Dataset class
class TextDataset(Dataset):
def __init__(self, texts, labels, tokenizer, max_length=128):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
encoding = self.tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=self.max_length,
padding='max_length',
truncation=True,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'labels': torch.tensor(label, dtype=torch.long)
}
# Initialize tokenizer and datasets
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
train_dataset = TextDataset(train_set['text'], train_set['label'], tokenizer)
val_dataset = TextDataset(val_set['text'], val_set['label'], tokenizer)
test_dataset = TextDataset(test_set['text'], test_set['label'], tokenizer)
# Initialize BERT model for classification
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=4)
# Training arguments
training_args = TrainingArguments(
output_dir='./results',
num_train_epochs=3,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
evaluation_strategy='epoch',
save_strategy='epoch',
logging_dir='./logs',
learning_rate=2e-5,
load_best_model_at_end=True
)
# Define Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset
)
# Train model
trainer.train()
# Evaluate model
results = trainer.evaluate(test_dataset)
print(f"Test Accuracy: {results['eval_accuracy']:.4f}")
# Save the model and tokenizer
model.save_pretrained("saved_model")
tokenizer.save_pretrained("saved_tokenizer")
用最少的努力进行增量训练
# Load the saved model and tokenizer
model = BertForSequenceClassification.from_pretrained("saved_model")
tokenizer = BertTokenizer.from_pretrained("saved_tokenizer")
# Load new data
new_data = (
pl.read_csv("set2.txt", separator=";", has_header=False, new_columns=["text", "label"])
.filter(pl.col("label").is_in(["fear", "joy", "sadness", "anger"]))
.with_columns(pl.col("label").replace_strict(label_mapping, default="other").cast(pl.Int16))
)
# Create new dataset
new_dataset = TextDataset(new_data['text'], new_data['label'], tokenizer)
# Update training arguments for incremental training
new_training_args = TrainingArguments(
output_dir='./results_incremental',
num_train_epochs=2, # Fewer epochs since it's incremental
per_device_train_batch_size=16,
evaluation_strategy='epoch',
logging_dir='./logs_incremental',
learning_rate=2e-5,
load_best_model_at_end=True
)
# Define new trainer
new_trainer = Trainer(
model=model,
args=new_training_args,
train_dataset=new_dataset,
eval_dataset=val_dataset # Validate on previous validation set
)
# Train on new data
new_trainer.train()
# Evaluate after retraining
new_results = new_trainer.evaluate(test_dataset)
print(f"Test Accuracy After Incremental Training: {new_results['eval_accuracy']:.4f}")
# Save the updated model
model.save_pretrained("saved_model_incremental")
作者:黑洞官方问答小能手
链接:https://www.pythonheidong.com/blog/article/2045468/3170b50c2736de4052fe/
来源:python黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 python黑洞网 All Rights Reserved 版权所有,并保留所有权利。 京ICP备18063182号-1
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!