Source code for logflow.logsparser.Cardinality

# Copyright 2020 BULL SAS All rights reserved #
from collections import Counter
from logflow.logsparser.Pattern import Pattern
from loguru import logger
from typing import Dict, List

[docs]class Cardinality: """A cardinality is a length of line. The length is defined as the number of words. Args: counter_general (dict): Counter of the different logs according in all the dataset. cardinality (int): Number of words """ def __init__(self, counter_general : dict, cardinality : int): self.counter_general = counter_general self.cardinality = cardinality self.dict_words : Dict[int, Dict[str, int]] = {} for i in range(self.cardinality): self.dict_words.setdefault(i, {}) self.list_pattern : List[Pattern] = []
[docs] def counter_word(self): """Count the number of words according to their place in the log. """ for entry in self.counter_general: position = 0 for word in entry: self.dict_words[position].setdefault(word,0) self.dict_words[position][word] += self.counter_general[entry] position += 1
[docs] def detect_patterns(self): """Detect the pattern based on the maximum number of similar words. """ for entry in self.counter_general: comparison_vector = [0]*self.cardinality position = 0 entry = list(entry) # Once the dict_words is created, we get the number of entries with the same word by only one access to the dictionnary. for word in entry: comparison_vector[position] += self.dict_words[position][word] position += 1 # We take the best subset of the similar words, i.e [10,10,2,2,2] keeps 2 as best subset. best_subset_words_number = Counter(comparison_vector).most_common(1)[0][0] # [(value, nb_value)] # We compute the index of the words kept best_subset_words_index = [i for i, e in enumerate(comparison_vector) if e == best_subset_words_number] # And the words theirself. best_subset_words_value = [entry[i] for i in best_subset_words_index] self.list_pattern.append(Pattern(self.cardinality, best_subset_words_value, best_subset_words_index)) self.list_pattern = list(set(self.list_pattern))
# logger.debug("Cardinality: " + str(self.cardinality) + " found " + str(len(self.list_pattern)) + " patterns")
[docs] def order_pattern(self): """Order the pattern by size to have a fast association between lines and patterns. """ for pattern in self.list_pattern: self.dict_patterns.setdefault(len(pattern), []) self.dict_patterns[len(pattern)].append(pattern)
[docs] def compute(self) -> Dict[int, List[Pattern]]: """Start the workflow for the multithreading implementation. Returns: (dict): the dict of patterns detected. """ self.counter_word() self.detect_patterns() self.dict_patterns : Dict[int, List[Pattern]]= {} self.order_pattern() return self.dict_patterns