# Copyright 2020 BULL SAS All rights reserved #
from loguru import logger
from collections import Counter
from logflow.logsparser.Pattern import Pattern
from typing import Dict, List, Tuple
from loguru import logger
# TODO: Test performance of static vs method
[docs]class Journal:
"""A journal is a list of logs files. It reads, parses and associates the logs and the pattern.
Args:
parser_message (function): Function to split the message part of the line.
path (str): path to the data
associated_pattern (bool, optional): Associate or discover the patterns. Note that if associated_pattern is True, dict_patterns must be provided. Defaults to False.
dict_patterns (dict, optional): Dict of the patterns for the association. Defaults to {}.
large_file (bool, optional): Optimization for the reading of one large file. Not implemented yet. Defaults to False.
pointer (int, optional): Optimization for the reading of one large file. Not implemented yet. Defaults to -1.
encoding (str, optional): Encoding of the files read. Defaults to "latin-1".
sort_function (function, optional): Function to sort the logs. Defaults to "", means logs are not sorted.
output (str, optional) : Set the output type. "logpai" to be usable with the benchmark provided by logpai. Defaults return only the ID of log.
"""
def __init__(self, parser_message, path : str , associated_pattern=False, dict_patterns = {}, large_file=False, pointer=-1, encoding="latin-1", sort_function="", output=""):
assert path != ""
assert parser_message != ""
if associated_pattern:
assert dict_patterns != {}
# self.list_logs = []
self.path = path
if large_file:
assert pointer != -1
self.pointer = pointer
self.sort_function = sort_function
self.parser_message = parser_message
self.encoding = encoding
self.dict_words_descriptors : Dict[str, str]= {}
self.dict_message : Dict[Tuple[str, ...], Tuple[str, ...]] = {}
self.associated_pattern = associated_pattern
self.dict_patterns = dict_patterns
self.output = output
[docs] def run(self):
"""Start the process
"""
if not self.associated_pattern:
# We discover the patterns
self.counter_logs = {}
self.dict_message = {}
self.read_file()
del self.dict_words_descriptors
del self.dict_message
self.dict_words_descriptors = {}
self.dict_message = {}
else:
# We associate lines and patterns
self.dict_message_associated : Dict[Tuple[str, ...], Pattern]= {}
self.list_patterns = []
self.read_file()
[docs] def count_log(self, line : str):
"""Count the number of same entries according to their descriptors. for space and computation optimization.
Example using 3 entries :
"Connexion of user Marc"
"Connexion of user Marc"
"Application failure node [1,0,0,2,4]"
Counter_logs will be : {"Connexion of user Marc":2, "Application failure node [1,0,0,2,4]", 1}.
To avoid useless computation, we use a dictionnary of line and line's descriptors. We do not compute the descriptors each time for each line.
Args:
line (str): line of log to add to the counter.
"""
# Parse the message to have the descriptors.
message : List[str] = self.parser_message(line=line)
if len(message) > 0:
# Get the frozen message because python can't used list as dictionnary key.
frozen_message : Tuple[str, ...] = tuple(message)
if frozen_message in self.dict_message:
# If the message is already in the dict, get the associated descriptors and add +1
self.counter_logs[self.dict_message[frozen_message]] += 1
else:
# Else, compute the descriptors, add the line and descriptors into the dict, and add the line of descriptors to the dict.
frozen_message_descriptors = tuple([self.filter_word(word) for word in message])
self.dict_message.setdefault(frozen_message, frozen_message_descriptors)
self.counter_logs.setdefault(frozen_message_descriptors, 1)
self.counter_logs[self.dict_message[frozen_message]] += 1
[docs] def associate_pattern(self, line : str):
"""Associate a line with a pattern. Add this pattern to the list of patterns.
Args:
line (str): line to be associated.
"""
# Parse the message
message = [self.filter_word(word) for word in self.parser_message(line=line)]
if len(message) > 0:
frozen_message = tuple(message)
if frozen_message in self.dict_message_associated:
# If we have already seen the message, we know the pattern.
if self.output == "":
self.list_patterns.append(self.dict_message_associated[frozen_message].id)
elif self.output == "logpai":
self.list_patterns.append({'Content': message, 'EventId': int(self.dict_message_associated[frozen_message].id), 'EventTemplate': self.dict_message_associated[frozen_message].pattern_str})
else:
# Else, compute it.
best_pattern = Journal.find_pattern(message, self.dict_patterns)
self.dict_message_associated[frozen_message] = best_pattern
if self.output == "":
self.list_patterns.append(best_pattern.id)
elif self.output == "logpai":
self.list_patterns.append({'Content': message, 'EventId': int(best_pattern.id), 'EventTemplate': best_pattern.pattern_str})
[docs] def read_file(self):
"""Read the logs files.
"""
if isinstance(self.path, str):
# For only one file
try:
with open(self.path, "r", encoding=self.encoding) as file_open:
if self.associated_pattern:
if self.sort_function != "":
lines = self.sort_function(list(file_open.readlines()))
for line in lines:
self.associate_pattern(line)
else:
for line in file_open.readlines():
self.associate_pattern(line)
else:
for line in file_open.readlines():
self.count_log(line)
except:
logger.error("Error while reading the file: " +str(self.path))
else:
# For a list of files.
for file_path in self.path:
try:
with open(file_path, "r", encoding=self.encoding) as file_open:
if self.associated_pattern:
if self.sort_function != "":
lines = self.sort_function(list(file_open.readlines()))
for line in lines:
self.associate_pattern(line)
else:
for line in file_open.readlines():
self.associate_pattern(line)
else:
for line in file_open.readlines():
self.count_log(line)
except:
logger.error("Error while reading the file: " +str(file_path))
[docs] def filter_word(self, word : str) -> str:
"""Get the descriptors of the word
Args:
word (str): word to describe
Returns:
str: descriptors of the word. They use a string representation of a list.
"""
if self.is_number(word):
return "NB"
elif word.isalpha() or len(word) == 1:
return word
else:
if word in self.dict_words_descriptors:
return self.dict_words_descriptors[word]
str_vector = Journal.create_vector(word)
self.dict_words_descriptors.setdefault(word, str_vector)
return str_vector
[docs] def is_number(self, s : str) -> bool:
"""Detect if a string is a float.
Args:
s (str): string to parse
Returns:
bool: True if the string is a float, False else.
"""
try:
float(s)
return True
except ValueError:
return False
[docs] @staticmethod
def find_pattern(message : List[str], dict_patterns : dict) -> Pattern:
"""Find the pattern associated to a log.
The best pattern is the pattern with the maximum common words with the line.
Args:
message (List[str]): list of the words of the message part of the log.
dict_patterns (dict): the dict of patterns.
Returns:
Pattern: the pattern associated to the line.
"""
# Create a default pattern to compare it to the other ones to find the best pattern.
best_pattern = Pattern(0, [], [])
# Get the patterns with the same cardinality as the line. The cardinality of a pattern is the cardinality used for finding this pattern and not this number of words.
dict_patterns_size = dict_patterns[len(message)]
# Get the descriptors
# Begin by the bigger pattern to save time.
for size_pattern in sorted(dict_patterns_size.keys(), reverse=True):
for pattern in dict_patterns_size[size_pattern]:
nb_word_match = 0
# Compute the number of common words
for i in range(len(pattern)):
if pattern.pattern_word[i] == message[pattern.pattern_index[i]]:
nb_word_match += 1
# If we have more common words, then we have a new best pattern
if nb_word_match > len(best_pattern):
best_pattern = pattern
# If new size if lower than the size of the actual best pattern, stop the detection.
if len(best_pattern) > size_pattern:
break
return best_pattern
[docs] @staticmethod
def static_is_number(s : str) -> bool:
"""Detect if a string is a float.
Args:
s (str): string to parse
Returns:
bool: True if the string is a float, False else.
"""
try:
float(s)
return True
except ValueError:
return False
[docs] @staticmethod
def static_filter_word(word : str) -> str:
"""Get the descriptors of the word
Args:
word (str): word to describe
Returns:
str: descriptors of the word. They use a string representation of a list.
"""
if Journal.static_is_number(word):
return "NB"
elif word.isalpha() or len(word) == 1:
return word
else:
return Journal.create_vector(word)
[docs] @staticmethod
def create_vector(word : str) -> str:
"""Create the vector of descriptors associated to a word
Args:
word (str): the word to describe using descriptors
Returns:
str: the descriptors
"""
vector = ["0"]*5
number = False
lower = False
upper = False
alnum = False
for letter in word:
if letter.isdigit():
vector[3] = "1"
number = True
elif letter.islower():
vector[1] = "1"
lower = True
elif letter.isupper():
vector[0] = "1"
upper = True
elif not letter.isalnum():
vector[2] = "1"
alnum = True
if number and lower and upper and alnum:
break
vector[4] = str(len(word))
str_vector = ''.join(vector)
return str_vector