Source code for logflow.relationsdiscover.Worker_per_cardinality

# Copyright 2020 BULL SAS All rights reserved #
from logflow.relationsdiscover.Model import LSTMLayer
from logflow.relationsdiscover.StoppingCondition import StoppingCondition
from logflow.relationsdiscover.Result import Result
from logflow.relationsdiscover.Saver import Saver
from logflow.relationsdiscover.Cardinality import Cardinality
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import numpy as np  # type: ignore
from loguru import logger
import torch
import torch.optim as optim
import torch.nn as nn
import threading
import random
# logger.add("./file_training.log")

[docs]class Worker_single(): """A single worker is responsible for the creation of the dataloader, the learning/testing step and for saving files of one cardinality. Args: cardinality (Cardinality): the cardinality object containing the data. lock (threading.Lock): lock used for saving files in the same file for all cardinalities. batch_size (int, optional): size of the batch. Defaults to 128. path_model (str, optional): path to the model to save. Defaults to "". name_dataset (str, optional): name of the dataset. Defaults to "". batch_result (int, optional): show results each batch_result number of batchs. Defaults to 2000. exclude_test (boolean, optional): exlude the testing step during the learning step. Can be use with the timer as stopping condition to have an exact duration. stoppingcondition (str, optional): condition to stop the learning step (timer, earlystopping, epoch). Defaults to earlystopping. condition_value (float, optional): stoppingcondition option. Value of the increase. Defaults to 0.005. condition_step (int, optional): stoppingcondition option. Number of steps. Defaults to 3. duration (int, optional): stoppingcondition option. Duration of the learning step in minute. Defaults to 60. condition_epoch (int, optional): stoppingcondition option. Number of epochs to be done. Defaults to 3. """ def __init__(self, cardinality: Cardinality, lock : threading.Lock, batch_size=128, path_model="", name_dataset ="", batch_result=20000, exclude_test=False, stoppingcondition="earlystopping", condition_value=0.005, condition_step=3, duration=5, condition_epoch=3): self.dataset = cardinality self.cardinality = self.dataset.cardinality self.batch_size = batch_size self.model = -1 # self.stopping_condition = StoppingCondition(method="earlystopping", condition_value = 0.005, condition_step=3) if stoppingcondition == "earlystopping": self.stopping_condition = StoppingCondition(method=stoppingcondition, condition_value=condition_value, condition_step=condition_step) elif stoppingcondition == "timer": self.stopping_condition = StoppingCondition(method=stoppingcondition, duration=duration) elif stoppingcondition == "epoch": self.stopping_condition = StoppingCondition(method=stoppingcondition, condition_epoch=condition_epoch) else: raise Exception("Stopping condition method is not implemented. Please use 'earlystopping', 'timer', or 'epoch'") self.path_model = path_model self.name_dataset = name_dataset self.lock = lock self.exlude_test = exclude_test self.saver = Saver(path_model=self.path_model, name_model=self.name_dataset, cardinality=self.cardinality, lock=self.lock) self.batch_result = batch_result if torch.cuda.is_available(): self.device = torch.device('cuda') logger.info("Starting learning on GPU") else: self.device = torch.device('cpu') logger.info ("Starting learning on CPU")
[docs] def create_dataloader(self, validation_split=0.6, condition="Test", subsample=False, subsample_split=0.01) -> DataLoader: """Create the dataloader for the learning/testing step. Args: validation_split (float, optional): ratio between the learning and the testing set. Defaults to 0.6. condition (str, optional): if Test the dataloader contains the test data. Else it contains the learning data. Defaults to "Test". subsample (bool, optional): use only a subsample of the data. Can be used for the learning and/or the testing step. Defaults to False. subsample_split (float, optional): ratio of the data to use. Defaults to 0.01. Returns: DataLoader: PyTorch dataloader corresponding to the previous features. """ if not self.dataset.loaded : self.dataset.load_files() self.dataset.compute_position() self.size = len(self.dataset) logger.info("Cardinality: " + str(self.dataset.cardinality) + " size of dataset: " + str(self.size)) logger.info("Nb of classes: " + str(self.dataset.number_of_classes)) # Set the random seed to have always the same random value. random_seed = 42 np.random.seed(random_seed) split = int(np.floor(validation_split * self.size)) indices = list(range(self.size)) np.random.shuffle(indices) if condition == "Test": indices = indices[:split] else: indices = indices[split:] if subsample: split = int(np.floor(subsample_split * len(indices))) np.random.shuffle(indices) indices = indices[:split] sampler = SubsetRandomSampler(indices) dataloader = DataLoader(self.dataset, batch_size=self.batch_size, pin_memory=True, drop_last=True, num_workers=5, sampler=sampler ) # type: ignore return dataloader
[docs] def load_model(self): """Load the learned model from a previous state Raises: e: file is not found """ self.model = LSTMLayer(num_classes=self.dataset.number_of_classes).to(self.device) try: self.model = self.saver.load(model=self.model) except FileNotFoundError as e : logger.critical("No such file: " +self.path_model + self.name_dataset + "_model.lf" + ".torch" ) print("Raising: ", e) raise e
[docs] def train(self, validation_split=0.6, resuming=False): """Train the model Args: validation_split (float, optional): ratio between testing and learning set. Defaults to 0.6. resuming (bool, optional): resume the learning from a previous step. Not implemented yet. Defaults to False. """ # Create the dataloader dataloader_train = self.create_dataloader(validation_split=validation_split, condition="train") if resuming: self.load_model() else: self.model = LSTMLayer(num_classes=self.dataset.number_of_classes, batch_size=self.batch_size).to(self.device) # Create the results result = Result(self.dataset, condition="Train") optimizer = optim.Adam(self.model.parameters()) loss_fn = nn.CrossEntropyLoss() self.model.train() logger.info("Cardinality: " + str(self.cardinality) + " Starting the learning step") # Start the learning while not self.stopping_condition.stop(): for index_batch, batch in enumerate(dataloader_train): optimizer.zero_grad() label = batch['output'] input_data = batch['input'].to(self.device) prediction = self.model(input_data) loss = loss_fn(prediction, label.to(self.device)) loss.backward() optimizer.step() result.update(prediction, label) # Compute the results each 2000 batchs. if index_batch % self.batch_result == 0 and index_batch != 0: result.computing_result(progress=index_batch/len(dataloader_train)) self.saver.save(model=self.model, result=result, condition="temp") if not self.exlude_test: # Test only on a subsample self.test(subsample=True, subsample_split=0.1) print(self.stopping_condition) # Test if we stop only for the timer method at each batch if self.stopping_condition.method == "timer" and self.stopping_condition.stop(): logger.debug("[Stopping] Cardinality: " + str(self.cardinality) + " " + str(self.stopping_condition) + " stopping learning step.") break # At the end of one epoch, use the all testing test and update the condition if not self.exlude_test: self.test() result.computing_result(reinit=True, progress=1) if self.stopping_condition.stop(): logger.debug("[Stopping] Cardinality: " + str(self.cardinality) + " " + str(self.stopping_condition) + " stopping learning step.") self.saver.save(model=self.model, result=result, condition="Train")
# logger.info("[Test] Cardinality: " + str(self.cardinality) + " " + str(self.stopping_condition) + " stopping learning step.") # self.saver.save(model=self.model.state_dict())
[docs] def test(self, validation_split=0.6, subsample=False, subsample_split=0.01): """Test the model Args: validation_split (float, optional): ratio between testing and learning set. Defaults to 0.6. subsample (bool, optional): if False, use all the available data, if True, use only a ratio of the data (subsample_split*data). Defaults to False. subsample_split (float, optional): ratio of the data to use. Defaults to 0.01. """ dataloader_test = self.create_dataloader(validation_split=validation_split, condition="Test", subsample=subsample, subsample_split=0.01) result = Result(self.dataset, condition="Test", subsample=subsample) if self.model == -1: self.load_model() self.model.eval() self.conf_matrix = torch.zeros(self.dataset.number_of_classes, self.dataset.number_of_classes) for index_batch, batch in enumerate(dataloader_test): label = batch['output'] input_data = batch['input'].to(self.device) prediction = self.model(input_data) result.update(prediction, label) if index_batch % self.batch_result == 0: result.computing_result(reinit=False, progress=index_batch/len(dataloader_test)) self.model.train() self.saver.save(model=self.model, result=result, condition="Test") result.computing_result(reinit=True, progress=1) self.stopping_condition.update(result.microf1) self.stopping_condition.stop()