Source code for logflow.treebuilding.Dataset

# Copyright 2020 BULL SAS All rights reserved #
from logflow.treebuilding.Log import Log
from logflow.treebuilding.Parser import Parser
from logflow import logsparser
import sys
sys.modules['logsparser'] = logsparser
from loguru import logger
import pickle
import numpy as np # type: ignore

cardinality_max = 8

[docs]class Dataset: """A dataset is an object containing the data. It is used to load the files and to compute the window for each new prediction on a log. Args: path_model (str, optional): path to the model to load. Defaults to "". name_model (str, optional): name of the model to load. Defaults to "". index_line_max (int, optional): load only the lines with a lower index in the file. Avoid to load all the lines. Defaults to float("+inf"). path_data (str, optional): path to the logs. Defaults to "". window_size (int, optional) : size of the window. Defaults to 30. parser_function (function, optional): Function to split the log entry and get the message part. Defaults to "", means split according to space and uses the words after the 9th position. """ def __init__(self, path_model="", name_model="", index_line_max=float("+inf"), path_data="", window_size=30, parser_function=""): self.list_logs = [] assert path_model != "" assert path_data != "" assert name_model != "" self.path_model = path_model self.name_model = name_model + "_model.lf" self.path_data = path_data self.index_line_max = index_line_max self.window_size = window_size self.parser_function = parser_function # self.load_files() # self.load_logs(index_line_max=index_line_max)
[docs] def load_files(self): """Load the files including word2vec, LSTM, counter and patterns. """ logger.info("Loading models file: " + self.path_model + self.name_model) with open(self.path_model + self.name_model, "rb") as output_file: dict_model = pickle.load(output_file) self.w2v = dict_model["word2vec"] self.counter_patterns = dict_model["counter_patterns"] self.dict_patterns = dict_model["dict_patterns"] self.LSTM = dict_model["LSTM"] self.parser = Parser(dict_patterns=self.dict_patterns, w2v=self.w2v, counter_patterns=self.counter_patterns)
[docs] def load_logs(self): """Load the selected logs file up to the index_line_max. """ index_line = 0 with open(self.path_data, "r", encoding="latin-1") as file_logs: for line in file_logs: self.list_logs.append(Log(line=line, index_line=index_line, parser_function=self.parser_function)) if index_line >= self.index_line_max: break index_line += 1
[docs] def get_slice(self, index_line=-1): """Get the window associated with the line at the index_line. Args: index_line (int, optional): index of the line. Defaults to -1. Returns: Log: return the log object with the slice added. If an error occurs, return -1. """ first_log = self.list_logs[index_line] # Just to be sure assert first_log.index_line == index_line index_inverted = index_line - 1 list_inputs = [] list_index_inputs = [] list_logs_selected = [] # Get the pattern of the log self.parser.get_pattern(first_log) # Issue with the pattern if first_log.pattern.id == -1 : logger.error(str("Log: " + str(first_log.message) + " is not usable due to pattern")) return -1 # Cardinality is higher than the higher cardinality learned during the learning step if first_log.cardinality >= cardinality_max: logger.error(str("Log: " + str(first_log.message) + " is not usable due to cardinality higher than card max")) return -1 # Get the embedding self.parser.get_w2v(first_log) if type(first_log.vector).__module__ != np.__name__: logger.error(str("Log: " + str(first_log.message) + " is not usable due to word2vec")) return -1 # Run until the window is filled while len(list_inputs) != self.window_size: # Get the previous log log_previous = self.list_logs[index_inverted] # and its pattern self.parser.get_pattern(log_previous) # If the pattern is different from the selected log if first_log.pattern.id != log_previous.pattern.id and log_previous.pattern.id != -1: # Get its embedding self.parser.get_w2v(log_previous) # If its embedding is valid if type(log_previous.vector).__module__ == np.__name__ : # Add it to the window list_inputs.append(log_previous.vector) list_index_inputs.append(index_inverted) list_logs_selected.append(log_previous) # Select the previous log index_inverted -= 1 # Padding if index_inverted < 0: if len(list_inputs) == 0: logger.warning(str("List embedding empty: " + str(first_log.message) + "is not usable due to slice")) return -1 for _ in range(self.window_size - len(list_inputs)): list_inputs.append(list_inputs[-1]) list_index_inputs.append(index_inverted) # If we have enough data, return the Log with the slice added. if len(list_inputs) == self.window_size: first_log.slice = list_inputs first_log.index_slice = list_index_inputs return first_log else: # Else, return -1 logger.warning(str("Log: " + str (first_log.timestamp) + " " + str(first_log.message) + "is not usable due to slice")) return -1
[docs] def show_selected_lines(self, index_line : int, range_line=100): """Show the selected and the range_line previous lines Args: index_line (int):the index of the selected line range_line (int, optional): number of previous lines to print. Defaults to 100. """ logger.info("Selected lines are : ") for index in range(range_line, -1, -1): if index_line-index >= 0: print(self.list_logs[index_line-index])
def __len__(self): """Return the size of the dataset Returns: int: number of logs of the dataset """ return len(self.list_logs)