Source code for logio.core.analysis_convert

try:
    import pandas as pd
    from pandas import DataFrame
    import numpy as np
    import scipy.stats as stats
    from typing import Union, Any, List
    import os
    import json
    import lasio
    import time
    from collections import defaultdict

except ImportError:
    print("Install the libraries before importing")

class DataNotFoundException(Exception):
    """Data not found"""
    pass

class InvalidFormatException(Exception):
    """Format does not conform with the required format"""
    pass

[docs]class Analysis: """" This class reads and performs basic data analysis. It houses five methods with their functionalities bordering on: | reading of files {csv, xlsx, json, las}; | returning a statistical description of the file; | returning a correlation matrix of the numerical features; | returning a cross tab of categorical features specified; | returning a frequency table of categorical features specified. """
[docs] def __init__(self): "No global attribute to initialize."
[docs] def read_file(self, filename: str) -> pd.DataFrame: """ reads in file in different supported formats into a DataFrame params: `filename -> the name of the file to read` returns: A dataframe """ formats = [ "xlsx", "csv", "las", "json" ] if filename.split(".")[-1].lower() not in formats: raise InvalidFormatException(f"File {filename} not in the allowed format") cur_path = os.getcwd() full_path = cur_path + "/" + filename if filename.endswith(".LAS") or filename.endswith(".las"): data = lasio.read(full_path).df() data = data.reset_index() return data if filename.endswith(".json"): data = pd.read_json(filename) elif filename.endswith(".csv"): data = pd.read_csv(filename) elif filename.endswith(".xlsx"): data = pd.read_excel(filename) return data
[docs] def describe(self, data): ''' this function returns the mean,median,min,max,skewness,kurtosis and Jarque Bera of numerical columns of a dataset.\n Note: -------\n Jarque Bera test will return NaN for columns with missing values Parameters: ------------\n data: The dataset to be analysed in csv,xlsx or las format.\n Returns: ---------\n returns the mean,median,min,max,skewness,kurtosis and Jarque Bera summary of a dataset. ''' #specifying the datatype == number df = data.select_dtypes(include="number") #creating a dictionary of the data description df_mean = df.mean().to_dict() df_median = df.median().to_dict() df_std = df.std().to_dict() df_min = df.min().to_dict() df_max = df.max().to_dict() df_skew = df.skew().to_dict() df_kurt = df.kurtosis().to_dict() df_jarque = {} for key in df.columns: result = stats.jarque_bera(df[key]) df_jarque[key] = result.statistic #creating a dataframe that will be render witht the description of our data describe = pd.DataFrame([ list(df_mean.values()), list(df_median.values()), list(df_std.values()), list(df_min.values()), list(df_max.values()), list(df_skew.values()), list(df_kurt.values()), list(df_jarque.values())],columns=list(df_mean.keys()),\ index=['mean','median','std','min','max','skewness','kurtosis','Jarque_bera']) #returning the dataframe return describe
[docs] def correlate(self, data, method : str = 'pearson'): """ Compute pairwise correlation of columns in a dataframe or lasio.las.LASFile, excluding NA/null and non-numerical values. Parameters ---------- data: method : {'pearson', 'kendall', 'spearman'} Method of correlation: * pearson : standard correlation coefficient * kendall : Kendall Tau correlation coefficient * spearman : Spearman rank correlation Returns ------- DataFrame Correlation matrix. """ # Confirm the type of the input data if type(data) == lasio.las.LASFile: df = data.df() elif type(data) == DataFrame: df = data else: raise InvalidFormatException('Data input is neither a lasio.las.LASFile nor DataFrame object') # Drop NA values from the dataframe df.dropna(inplace = True) # Selects the numerical columns in the dataframe new_df = df.select_dtypes(include = 'number') return new_df.corr(method = method)
[docs] def cross_tab(self, data, row, column): """ returns a crosstab for only two selected categorical feature in the dataset.\n Parameters: ----------\n data : Dataset in csv,xlsx, json or las format. The dataset to extract the crosstab of the selected categorical features (columns)\n row : The first selected categorical features(column)\n column: The second selected categorical feature(column) Returns: -------\n return a crosstab for two selected categorical feature in the dataset.\n """ #a sub function to use pandas crosstab under the hood def create_crosstab(data,row,column): tab = pd.crosstab(data[row],data[column]) cat_col = [col for col in data.columns if (data[col].dtype != 'int64' and data[col].dtype !='float64')] if (row in cat_col and column in cat_col): return tab elif (row not in cat_col or column not in cat_col): raise TypeError ('Unsupported column type, Input a column with categorical variables') #called the subfunction crosstab = create_crosstab(data=data,row=row,column=column) return crosstab
[docs] def frequencyTable(self, data: Union[pd.DataFrame, pd.Series, Any], col_names: Union[str, List[str]]) -> Union[pd.DataFrame,List[pd.DataFrame]]: """ Computes a frequency table for the categorical variables in the dataframe. params: `data -> pandas dataframe that the frequency table is computed from.` `col_names -> a string or a list of strings indicating the columns that contains the categorical data` returns: a pandas dataframe that contains the frequency table. """ if data is None or col_names is None: raise DataNotFoundException("No data was provided or no column names was provided") if isinstance(col_names, str): if data[col_names].dtype != np.dtype("object"): raise InvalidFormatException("The column specified is not a categorical column") target = data[col_names] try: target.isnull().sum() == 0 uniques = sorted(target.unique()) except TypeError: print('Missing values detected in one or both of the columns selected') print('The frequency and percentage of the missing value will not be accounted for') uniques = target.unique() frequencies = defaultdict(list) for val in uniques: for value in target.values: if val == value: frequencies[val].append(1) count = sum(i for i in frequencies[val]) frequencies[val] = count names = [i[0] for i in list(frequencies.items())] values = [i[1] for i in list(frequencies.items())] freq = [v / sum(values) * 100 for v in values] return pd.DataFrame({'frequencies': values, "percentage":freq}, index=[names]) else: temp = [] for col in col_names: if data[col].dtype != np.dtype("object"): raise InvalidFormatException("The column specified is not a categorical column") target = data[col] try: target.isnull().sum() == 0 uniques = sorted(target.unique()) except TypeError: print('Missing values detected in one or both of the columns selected') print('The frequency and percentage of the missing value will not be accounted for') uniques = target.unique() frequencies = defaultdict(list) for val in uniques: for value in target.values: if val == value: frequencies[val].append(1) count = sum(i for i in frequencies[val]) frequencies[val] = count names = [i[0] for i in list(frequencies.items())] values = [i[1] for i in list(frequencies.items())] freq = [v / sum(values) * 100 for v in values] temp.append(pd.DataFrame({'frequencies': values, "percentage":freq}, index=[names])) return temp
[docs]class FileConverter(): """ This class converts files from one format to another. The allowed extensions are tailored towards possible well-log data formats: |csv, |xlsx, |json |las """
[docs] def __init__(self, filename, output_format): "Initialize filename and output_format attribute." self.filename = filename self.output_format = str.lower(output_format) self.input_format = str.lower(filename.rsplit('.')[1])
[docs] def convert_file(self): """ This method takes in a data format, and returns the data in a format specified by the user, in the current working directory. param_definition --------------- input_format : the input file format. output_format : the specified output file format ('csv, xlsx, json, las') The csv format is the central format: for any conversion from an input_format to an output_format other than csv, the input_format is first converted to a csv, and afterwards, converted to the required output file format. """ ALLOWED_EXTENSIONS = {"csv", "xlsx", "json", "las"} print(f'The allowed file extensions are {ALLOWED_EXTENSIONS}') print('\nThe data to be read has to be in the current working directory') filename = self.filename # The following methods seek to convert files from csv to the specified output format def csv_to_xlsx(self, df=None): """converts from csv to xlsx""" fn = f"{self.filename.rsplit('.')[0]}.xlsx" if self.input_format == 'csv': data = pd.read_csv(self.filename) filename = str.lower(self.filename) data.to_excel(fn, index=False) return fn else: df.to_excel(fn, index=False) return fn def csv_to_json(self, df=None): """converts from csv to json""" fn = f"{self.filename.rsplit('.')[0]}.json" if self.input_format == 'csv': data = pd.read_csv(self.filename) filename = str.lower(self.filename) data.to_json(fn) return fn else: df.to_json(fn) return fn def csv_to_las(self, df=None): """converts from csv to las""" fn = f"{self.filename.rsplit('.')[0]}.las" if self.input_format == 'csv': data = pd.read_csv(self.filename) filename = str.lower(self.filename) las_file = lasio.LASFile() for col in data.columns: if data[col].dtype == np.dtype("object"): raise InvalidFormatException("unsupported: categorical columns detected") else: las_file.add_curve(col, data[col]) las_file.write(fn) return fn else: las_file = lasio.LASFile() for col in df.columns: if df[col].dtype == np.dtype("object"): raise InvalidFormatException("unsupported: categorical columns detected") else: las_file.add_curve(col, df[col]) las_file.write(fn) return fn # The following functions seek to convert from other file types to csv format. def xlsx_to_csv(self): """converts excel to csv""" df = pd.read_excel(self.filename) filename = str.lower(self.filename) fn = f"{filename.rsplit('.')[0]}.csv" if self.output_format == "csv": df.to_csv(fn, index=False) return fn else: return fn, df def json_to_csv(self): """converts json to csv""" df = pd.read_json(self.filename) filename = str.lower(self.filename) fn = f"{filename.rsplit('.')[0]}.csv" if self.output_format == "csv": df.to_csv(fn, index=False) return fn else: return fn, df def las_to_csv(self): """converts las to csv""" data = lasio.read(self.filename) df = data.df() df = df.reset_index() filename = str.lower(self.filename) fn = f"{filename.rsplit('.')[0]}.csv" if self.output_format == "csv": df.to_csv(fn, index=False) return fn else: return fn, df start_time = time.time() if (self.input_format in ALLOWED_EXTENSIONS and self.output_format in ALLOWED_EXTENSIONS): if self.input_format == 'csv': if self.output_format == 'csv': print('\nError: same input and output format specified.') elif self.output_format == 'xlsx': fn = csv_to_xlsx(self) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.output_format == 'json': fn = csv_to_json(self) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.output_format == 'las': fn = csv_to_las(self) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.input_format == 'xlsx': if self.output_format == 'xlsx': print('\nError: same input and output format specified') elif self.output_format == 'csv': fn = xlsx_to_csv(self) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.output_format == 'json': new_fn, df = xlsx_to_csv(self) self.filename = new_fn fn = csv_to_json(self, df) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.output_format == 'las': new_fn, df = xlsx_to_csv(self) self.filename = new_fn fn = csv_to_las(self, df) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.input_format == 'json': if self.output_format == 'json': print('\nError: same input and output format specified') elif self.output_format == 'csv': fn = json_to_csv(self) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.output_format == 'xlsx': new_fn, df = json_to_csv(self) self.filename = new_fn fn = csv_to_xlsx(self, df) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.output_format == 'las': new_fn, df = json_to_csv(self) self.filename = new_fn fn = csv_to_las(self, df) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.input_format == 'las': if self.output_format == 'las': print('\nError: same input and output format specified') elif self.output_format == 'csv': fn = las_to_csv(self) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.output_format == 'json': new_fn, df = las_to_csv(self) self.filename = new_fn fn = csv_to_json(self, df) print(f'\nDone. The output data {fn} has been saved in the current working directory') elif self.output_format == 'xlsx': new_fn, df = las_to_csv(self) self.filename = new_fn fn = csv_to_xlsx(self, df) print(f'\nDone. The output data {fn} has been saved in the current working directory') else: print("\nError: Unsupported file format. Check your input and output file format") end_time = time.time() time_lapsed = end_time-start_time print(f"\n {time_lapsed} seconds")