Refactor Python code to class to file

draft_specifications
meerkat 2021-11-10 22:54:01 +11:00
parent 6aec64569d
commit c6a519c6b6
5 changed files with 1047 additions and 898 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,350 @@
from genericpath import getsize
import os
import uuid
import datetime
import getpass
import hashlib
import glob
import requests
import mimetypes
from mconfiguration import mConfiguration
from mlogging import mLogging
class mAttribute:
_oConfiguration = None
_Log = None
def __init__(self):
self._LogOpen = False
self._oConfiguration = mConfiguration()
self._Log = mLogging()
self._Log.SetConfig(self._oConfiguration.GetConfig("logPath"), self._oConfiguration.GetSoftwareName())
def SetConfig(self, Configuration):
self._oConfiguration = Configuration
self._Log.SetConfig(self._oConfiguration.GetConfig("logPath"), self._oConfiguration.GetSoftwareName())
def UpdateAttribute(self, Attributes, ACategory, AName, AFunction, Comparison, Value):
for attr in Attributes:
if attr["category"] == ACategory and attr["name"] == AName and attr["function"] == AFunction:
attr["comparison"] = Comparison
attr["value"] = Value
return Attributes
oAttribute = {
"category": ACategory,
"name": AName,
"function": AFunction,
"comparison": Comparison,
"value": Value
}
Attributes.append(oAttribute)
return Attributes
def NewDefaultAnyAttributes(self, anyFileName):
records = 0
anyFile = open(anyFileName,'r')
while True:
next_line = anyFile.readline()
if not next_line:
break;
records = records + 1
anyFile.close()
lattribute = []
oAttribute = {
"category": "dataset",
"name": "records",
"function": "count",
"comparison": "EQ",
"value": records
}
lattribute.append(oAttribute)
return lattribute
def NewDefaultCsvAttributes(self):
lattribute = []
oAttribute = {
"category": "dataset",
"name": "header",
"function": "count",
"comparison": "NA",
"value": 1
}
lattribute.append(oAttribute)
oAttribute = {
"category": "dataset",
"name": "footer",
"function": "count",
"comparison": "NA",
"value": 0
}
lattribute.append(oAttribute)
oAttribute = {
"category": "format",
"name": "separator",
"function": "value",
"comparison": "NA",
"value": ","
}
lattribute.append(oAttribute)
oAttribute = {
"category": "format",
"name": "columns",
"function": "value",
"comparison": "NA",
"value": ","
}
lattribute.append(oAttribute)
oAttribute = {
"category": "dataset",
"name": "records",
"function": "count",
"comparison": "NA",
"value": 0
}
lattribute.append(oAttribute)
oAttribute = {
"category": "dataset",
"name": "columns",
"function": "count",
"comparison": "NA",
"value": 0
}
lattribute.append(oAttribute)
return lattribute
def NewDefaultJsonAttributes(self):
lattribute = []
oAttribute = {
"category": "format",
"name": "list",
"function": "offset",
"comparison": "NA",
"value": ","
}
lattribute.append(oAttribute)
oAttribute = {
"category": "format",
"name": "columns",
"function": "value",
"comparison": "NA",
"value": ","
}
lattribute.append(oAttribute)
oAttribute = {
"category": "dataset",
"name": "records",
"function": "count",
"comparison": "NA",
"value": 0
}
lattribute.append(oAttribute)
oAttribute = {
"category": "dataset",
"name": "columns",
"function": "count",
"comparison": "NA",
"value": 0
}
lattribute.append(oAttribute)
return lattribute
def NewDefaultZipAttributes(self, CompressionType = "ZIP",Encryption = ""):
lattribute = []
oAttribute = {
"category": "format",
"name": "compression",
"function": "algo",
"comparison": "NA",
"value": CompressionType
}
lattribute.append(oAttribute)
oAttribute = {
"category": "format",
"name": "encryption",
"function": "algo",
"comparison": "NA",
"value": Encryption
}
lattribute.append(oAttribute)
oAttribute = {
"category": "dataset",
"name": "files",
"function": "count",
"comparison": "NA",
"value": 0
}
lattribute.append(oAttribute)
return lattribute
def SetAttributeValueString(self, Attributes, Category, Key, Function, Value, Comparison="EQ"):
for item in Attributes:
if item["category"] == Category and item["name"] == Key and item["function"] == Function:
if item["comparison"] == "NA" or item["comparison"] == Comparison:
item["comparison"] = Comparison
item["value"] = Value
return
# Add the attribute
oAttribute = {
"category": Category,
"name": Key,
"function": Function,
"comparison": Comparison,
"value": Value
}
Attributes.append(oAttribute)
return
def SetAttributeValueNumber(self, Attributes, Category, Key, Function, Value, Comparison = "EQ"):
for item in Attributes:
if item["category"] == Category and item["name"] == Key and item["function"] == Function:
if item["comparison"] == "NA" or item["comparison"] == Comparison:
item["comparison"] = Comparison
item["value"] = Value
return
# Add the attribute
oAttribute = {
"category": Category,
"name": Key,
"function": Function,
"comparison": Comparison,
"value": Value
}
Attributes.append(oAttribute)
return
def SetAttributes(self, PathFile, FileType, ExtendedAttributes):
lattribute = None
matched = False
if FileType == "csv":
matched = True
lattribute = self.NewDefaultCsvAttributes()
if ExtendedAttributes:
delimiter = ","
rowCount = 0
colCount = 0
#TODO check import
import csv
with open(PathFile, 'r') as csvfile:
datareader = csv.reader(csvfile, delimiter=",")
for row in datareader:
if len(row) > colCount:
colCount = len(row)
rowCount = rowCount + 1
self.SetAttributeValueNumber(lattribute, Category="dataset", Key="records", Function="count", Value=rowCount)
self.SetAttributeValueNumber(lattribute, Category="dataset", Key="columns", Function="count", Value=colCount)
if FileType == "txt":
matched = True
lattribute = self.NewDefaultCsvAttributes()
if ExtendedAttributes:
rowCount = 0
colCount = 0
#TODO check import
import csv
with open(PathFile, 'r') as csvfile:
datareader = csv.reader(csvfile, delimiter="\t")
for row in datareader:
if len(row) > colCount:
colCount = len(row)
rowCount = rowCount + 1
self.SetAttributeValueNumber(lattribute, Category="dataset", Key="records", Function="count", Value=rowCount)
self.SetAttributeValueNumber(lattribute, Category="dataset", Key="columns", Function="count", Value=colCount)
if FileType == "json":
matched = True
lattribute = self.NewDefaultJsonAttributes()
if FileType == "zip":
matched = True
lattribute = self.NewDefaultZipAttributes("ZIP")
if ExtendedAttributes:
self.SetAttributeValueNumber(lattribute, "dataset", "files", "count", 0, Comparison="NA")
if FileType == "7z":
matched = True
lattribute = self.NewDefaultZipAttributes("7Z")
if ExtendedAttributes:
self.SetAttributeValueNumber(lattribute, "dataset", "files", "count", 0, Comparison="NA")
if not matched:
lattribute = self.NewDefaultAnyAttributes(PathFile)
if lattribute == None:
lattribute = []
return lattribute

View File

@ -0,0 +1,354 @@
import ftplib
from genericpath import getsize
import os
import uuid
import json
import datetime
import getpass
import hashlib
import glob
import argparse
from configparser import ConfigParser
import requests
import mimetypes
#from source.python.client.mlogging import mLogging
from mlogging import mLogging
class mConfiguration:
_SoftwareVersion = "0.0.1"
_default_metaFile = "##marti##.json"
_oSoftware = {
"extension": "software",
"softwareName": "MARTILQREFERENCE",
"author": "Meerkat@merebox.com",
"version": "0.0.1"
}
_oConfiguration = None
_Log = None
def GetSoftwareName(self):
return "MARTILQREFERENCE"
def __init__(self):
self._oConfiguration = {
"softwareName": self.GetSoftwareName(),
"softwareAuthor": "Meerkat@merebox.com",
"softwareVersion": self._SoftwareVersion,
"logPath": "./logs/",
"dateFormat": "2006-01-02",
"dateTimeFormat": "2006-01-02T15:04:05+0100",
"dataPath": "",
"temPath": "",
"tags": None,
"publisher": "",
"contactPoint": "",
"license": "",
"accessLevel": "Confidential",
"rights": "Restricted",
"batch": 1.0000,
"batchInc": 0.0001,
"theme": "",
"author": "",
"title": "{{documentName}}",
"state": "active",
"expires": "m:7:0:0",
"version": "1.0",
"urlPrefix": "",
"encoding": "",
"compression": "",
"encryption": "",
"describedBy": "",
"landingPage": "",
"hashAlgorithm": "SHA256",
"signKey_File": None,
"signKey_Password": None,
"proxy": None,
"proxy_User": None,
"proxy_Credential": None,
"loaded": False
}
self._Log = mLogging()
self._Log.SetConfig(self._oConfiguration["logPath"], self.GetSoftwareName())
def LoadConfig(self, ConfigPath=None):
config_object = ConfigParser()
if not ConfigPath is None:
if os.path.exists(ConfigPath):
config_object.read(ConfigPath)
else:
self._Log.WriteLog("Configuration path '{}' does not exist".format(ConfigPath))
raise Exception("Configuration path '{}' does not exist".format(ConfigPath))
else:
# Look in default location and name
home = os.path.expanduser('~')
if os.path.exists(os.path.join(home, ".martilq/martilq.ini")):
ConfigPath = os.path.join(home, ".martilq/martilq.ini")
if os.path.exists("martilq.ini"):
ConfigPath = "martilq.ini"
if not ConfigPath is None:
self._Log.WriteLog("Usig configuration path '{}'".format(ConfigPath))
config_object.read(ConfigPath)
if config_object.has_section("General"):
items = config_object["General"]
if not items is None:
config_attr = ["logPath", "tempPath", "dataPath", "dateFomat", "dateTimeFormat"]
for x in config_attr:
try:
if not items[x] is None and not items[x] == "":
self._oConfiguration[x] = items[x]
except Exception:
self._Log.WriteLog("Error in config ignored: " + x)
self._Log.SetConfig(self._oConfiguration["logPath"], self.GetSoftwareName())
if config_object.has_section("MartiLQ"):
items = config_object["MartiLQ"]
if not items is None:
config_attr = ["rights", "accessLevel", "tags","publisher","batch","theme", "license","contactPoint"]
for x in config_attr:
try:
if not items[x] is None and not items[x] == "":
self._oConfiguration[x] = items[x]
except Exception:
self._Log.WriteLog("Error in config ignored: " + x)
if config_object.has_section("Resources"):
items = config_object["Resources"]
if not items is None:
config_attr = ["state", "author", "title", "expires","encoding", "version", "urlPrefix", "compression", "encryption", "describedBy", "landingPage"]
for x in config_attr:
try:
if not items[x] is None and not items[x] == "":
self._oConfiguration[x] = items[x]
except Exception:
self._Log.WriteLog("Error in config ignored: " + x)
if config_object.has_section("Hash"):
items = config_object["Hash"]
if not items is None:
config_attr = ["hashAlgorithm", "signKey_File", "signKey_Password"]
for x in config_attr:
try:
if not items[x] is None and not items[x] == "":
self._oConfiguration[x] = items[x]
except Exception:
self.WriteLog("Error in config ignored: " + x)
if config_object.has_section("Network"):
items = config_object["Network"]
if not items is None:
config_attr = ["proxy", "proxy_User", "proxy_Credential"]
for x in config_attr:
try:
if not items[x] is None and not items[x] == "":
self._oConfiguration[x] = items[x]
except Exception:
self._Log.WriteLog("Error in config ignored: " + x)
# Now check environmental values
self._oConfiguration["signKey_File"] = os.getenv("MARTILQ_SIGNKEY_FILE", self._oConfiguration["signKey_File"])
self._oConfiguration["signKey_Password"] = os.getenv("MARTILQ_SIGNKEY_PASSWORD", self._oConfiguration["signKey_Password"])
self._oConfiguration["logPath"] = os.getenv("MARTILQ_LOGPATH", self._oConfiguration["logPath"])
self._Log.WriteLog("Configuration load processed")
def SaveConfig(self, ConfigPath=None):
if not os.path.isfile(ConfigPath):
cfgfile = open(ConfigPath, 'w')
config_object = ConfigParser()
config_object.add_section("General")
config_attr = ["logPath", "tempPath", "dataPath", "dateFomat", "dateTimeFormat"]
for x in config_attr:
try:
if x in self._oConfiguration:
if self._oConfiguration[x] is None:
config_object.set("General", x, "")
elif type(self._oConfiguration[x]) is float or type(self._oConfiguration[x]) is int:
config_object.set("General", x, str(self._oConfiguration[x]))
else:
config_object.set("General", x, self._oConfiguration[x])
except Exception as e:
self._Log.WriteLog("Error in config ignored: " + x + " = " + str(e))
config_object.add_section("MartiLQ")
config_attr = ["rights", "accessLevel", "tags","publisher","batch","theme", "license","contactPoint"]
for x in config_attr:
try:
if x in self._oConfiguration:
if self._oConfiguration[x] is None:
config_object.set("MartiLQ", x, "")
elif type(self._oConfiguration[x]) is float or type(self._oConfiguration[x]) is int:
config_object.set("MartiLQ", x, str(self._oConfiguration[x]))
else:
config_object.set("MartiLQ", x, self._oConfiguration[x])
except Exception as e:
self._Log.WriteLog("Error in config ignored: " + x + " = " + str(e))
config_object.add_section("Resources")
config_attr = ["state", "author", "title", "expires","encoding", "version", "urlPrefix", "compression", "encryption", "describedBy", "landingPage"]
for x in config_attr:
try:
if x in self._oConfiguration:
if self._oConfiguration[x] is None:
config_object.set("Resources", x, "")
elif type(self._oConfiguration[x]) is float or type(self._oConfiguration[x]) is int:
config_object.set("Resources", x, str(self._oConfiguration[x]))
else:
config_object.set("Resources", x, self._oConfiguration[x])
except Exception as e:
self._Log.WriteLog("Error in config ignored: " + x + " = " + str(e))
config_object.add_section("Hash")
config_attr = ["hashAlgorithm", "signKey_File", "signKey_Password"]
for x in config_attr:
try:
if x in self._oConfiguration:
if self._oConfiguration[x] is None:
config_object.set("Hash", x, "")
elif type(self._oConfiguration[x]) is float or type(self._oConfiguration[x]) is int:
config_object.set("Hash", x, str(self._oConfiguration[x]))
else:
config_object.set("Hash", x, self._oConfiguration[x])
except Exception as e:
self._Log.WriteLog("Error in config ignored: " + x + " = " + str(e))
config_object.add_section("Network")
config_attr = ["proxy", "proxy_User", "proxy_Credential"]
for x in config_attr:
try:
if x in self._oConfiguration:
if self._oConfiguration[x] is None:
config_object.set("Network", x, "")
elif type(self._oConfiguration[x]) is float or type(self._oConfiguration[x]) is int:
config_object.set("Network", x, str(self._oConfiguration[x]))
else:
config_object.set("Hash", x, self._oConfiguration[x])
except Exception as e:
self._Log.WriteLog("Error in config ignored: " + x + " = " + str(e))
config_object.add_section("Custom_Spatial")
config_attr = ["enabled", "country", "region", "town"]
for x in config_attr:
config_object.set("Custom_Spatial", x, "")
config_object.add_section("Custom_Temporal")
config_attr = ["enabled", "businessDate", "runDate"]
for x in config_attr:
config_object.set("Custom_Temporal", x, "")
config_object.write(cfgfile)
cfgfile.close()
self._Log.WriteLog("Configuration save processed")
return True
else:
self._Log.WriteLog("Configuration file exists and new not saved")
return False
def SetConfig(self, Key=None, Value=None):
if not Key is None:
self._oConfiguration[Key] = Value
def GetConfig(self, Key=None):
try:
if self._oConfiguration is None:
return None
if not Key is None:
if Key == "tags":
return self._oConfiguration[Key].split(",")
return self._oConfiguration[Key]
else:
return None
except Exception:
self._Log.WriteLog("Error in getting config: "+ Key)
return None
def ExpireDate(self, sourcePath): # time.Time
expires = datetime.datetime.today()
lExpires = self._oConfiguration["expires"].split(":")
if len(lExpires) != 4 and len(lExpires) != 7:
raise Exception("Expires value '"+ self._oConfiguration["expires"] +"' is invalid")
base = lExpires[0]
if sourcePath == "" or base == "m":
base = "t"
modified = datetime.datetime.today()
if base == "m":
try:
mtime = os.path.getmtime(sourcePath)
except OSError:
mtime = 0
modified = datetime.datetime.fromtimestamp(mtime)
lExpire = [0, 0, 0]
lExpire[0] = int(lExpires[1])
lExpire[1] = int(lExpires[2])
lExpire[2] = int(lExpires[3])
if len(lExpires) > 4:
lExpireD = [0,0,0]
lExpireD[0] = int(lExpires[4])
lExpireD[1] = int(lExpires[5])
lExpireD[2] = int(lExpires[6])
if base == "m":
expires = modified + datetime.timedelta(years=lExpire[0],months=lExpire[1],days=lExpire[2],hours=lExpireD[0], minutes=lExpireD[1], seconds=lExpireD[2])
elif base == "r":
expires = self._oConfiguration.temporal.RunDate + datetime.timedelta(years=lExpire[0],months=lExpire[1],days=lExpire[2],hours=lExpireD[0], minutes=lExpireD[1], seconds=lExpireD[2])
elif base == "b":
expires = self._oConfiguration.temporal.BusinessDate + datetime.timedelta(years=lExpire[0],months=lExpire[1],days=lExpire[2],hours=lExpireD[0], minutes=lExpireD[1], seconds=lExpireD[2])
#elif base == "t":
# fallthrough
else:
expires = datetime.datetime.today() + datetime.timedelta(years=lExpire[0],months=lExpire[1],days=lExpire[2],hours=lExpireD[0], minutes=lExpireD[1], seconds=lExpireD[2])
else:
if base == "m":
expires = modified + datetime.timedelta(years=lExpire[0],months=lExpire[1],days=lExpire[2])
elif base == "r":
expires = self._oConfiguration.temporal.RunDate + datetime.timedelta(years=lExpire[0],months=lExpire[1],days=lExpire[2])
elif base == "b":
expires = self._oConfiguration.temporal.BusinessDate + datetime.timedelta(years=lExpire[0],months=lExpire[1],days=lExpire[2])
#elif base == "t":
# fallthrough
else:
expires = datetime.datetime.today() + datetime.timedelta(days=lExpire[2])
expires = expires.replace(year=expires.year+lExpire[0])
if expires.month+lExpire[1] > 12:
expires = expires.replace(year=expires.year+1)
expires = expires.replace(month=expires.month+lExpire[1]-12)
else:
expires = expires.replace(month=expires.month+lExpire[1])
return expires

View File

@ -0,0 +1,75 @@
import os
import datetime
class mLogging:
_LogOpen = False
_LogPath = None
_SoftwareName = "MARTILQREFERENCE"
def SetConfig(self, LogPath, SoftwareName):
self._LogPath = LogPath
self._SoftwareName = SoftwareName
def GetLogName(self):
today = datetime.datetime.today()
dateToday = today.strftime("%Y-%m-%d")
if None == self._LogPath or self._LogPath == "":
return None
if not os.path.exists(self._LogPath):
os.mkdir(self._LogPath)
logName = self._SoftwareName + "_" + dateToday + ".log"
return os.path.join(self._LogPath, logName)
def WriteLog(self, LogEntry):
sFullPath = self.GetLogName()
today = datetime.datetime.today()
dateToday = today.strftime("%Y-%m-%dT%H:%M:%S")
if None != sFullPath and sFullPath != "":
if not os.path.exists(sFullPath):
print("Log path: {}".format(sFullPath))
filec = open(sFullPath, "a")
else:
filec = open(sFullPath, "a")
filec.write(dateToday)
filec.write(".")
filec.write(LogEntry)
filec.write("\n")
filec.close()
def OpenLog(self):
if not self._LogOpen:
today = datetime.datetime.today()
dateToday = today.strftime("%Y-%m-%d")
self.WriteLog("***********************************************************************************")
self.WriteLog("* Start of processing: {}".format(dateToday))
self.WriteLog("***********************************************************************************")
self._LogOpen = True
def CloseLog(self):
if self._LogOpen:
today = datetime.datetime.today()
dateToday = today.strftime("%Y-%m-%d")
self.WriteLog("***********************************************************************************")
self.WriteLog("* End of processing: {}".format(dateToday))
self.WriteLog("***********************************************************************************")
self._LogOpen = False

View File

@ -0,0 +1,212 @@
from genericpath import getsize
import os
import uuid
import datetime
import getpass
import hashlib
import glob
import requests
import mimetypes
from mconfiguration import mConfiguration
from mlogging import mLogging
from mattribute import mAttribute
class mResource:
_oConfiguration = None
_Log = None
def __init__(self):
self._LogOpen = False
self._oConfiguration = mConfiguration()
self._Log = mLogging()
self._Log.SetConfig(self._oConfiguration.GetConfig("logPath"), self._oConfiguration.GetSoftwareName())
def SetConfig(self, Configuration):
self._oConfiguration = Configuration
self._Log.SetConfig(self._oConfiguration.GetConfig("logPath"), self._oConfiguration.GetSoftwareName())
def GetContentType(self, SourcePath):
ext = None
# Some overrides
match = str(os.path.splitext(SourcePath)[1][1:]).lower()
if (ext is None or ext == "") and match == "csv":
ext = "text/csv"
if ext is None or ext == "":
ext, _ = mimetypes.guess_type(SourcePath, strict=True)
if ext is None or ext == "":
match = str(os.path.splitext(SourcePath)[1][1:]).lower()
if match == "md":
ext = "application/markdown"
if ext is None or ext == "":
ext = "application/vnd.unknown." + os.path.splitext(SourcePath)[1][1:]
return ext
def NewMartiLQResource(self, SourcePath, UrlPath, ExcludeHash, ExtendAttributes):
today = datetime.datetime.today()
dateToday = today.strftime("%Y-%m-%dT%H:%M:%S")
self._MartiErrorId = ""
self._Log.OpenLog()
self._Log.WriteLog("Function 'NewMartiLQResource' parameters follow")
self._Log.WriteLog("Parameter: SourcePath Value: {}".format(SourcePath))
self._Log.WriteLog("Parameter: UrlPath Value: {}".format(UrlPath))
self._Log.WriteLog("Parameter: ExcludeHash Value: {}".format(ExcludeHash))
self._Log.WriteLog("")
if os.path.exists(SourcePath):
item = os.path.basename(SourcePath)
self._Log.WriteLog("Define file {}".format(SourcePath))
HashAlgorithm = self._oConfiguration.GetConfig("hashAlgorithm")
try:
mtime = os.path.getmtime(SourcePath)
except OSError:
mtime = 0
last_modified_date = datetime.datetime.fromtimestamp(mtime).strftime("%Y-%m-%dT%H:%M:%S")
if ExcludeHash:
hash = None
else:
hash = self.NewMartiLQHash(Algorithm=HashAlgorithm, FilePath=SourcePath, Value="", Sign=self._oConfiguration.GetConfig("signKey_File"))
oAttr = mAttribute()
oAttr.SetConfig(self._oConfiguration)
lattribute = oAttr.SetAttributes(SourcePath, str(os.path.splitext(SourcePath)[1][1:]).lower(), ExtendAttributes)
sTitle = self._oConfiguration.GetConfig("title")
if sTitle == "{{documentName}}":
sTitle = item.replace(os.path.splitext(SourcePath)[1], "")
if sTitle == "{{documentName.ext}}":
sTitle = item
oResource = {
"title": sTitle,
"uid": str(uuid.uuid4()),
"documentName": item,
"issueDate": dateToday,
"modified": last_modified_date,
"expires": self._oConfiguration.ExpireDate(item).strftime("%Y-%m-%dT%H:%M:%S%z"),
"state": self._oConfiguration.GetConfig("state"),
"author": self._oConfiguration.GetConfig("author"),
"length": os.path.getsize(SourcePath),
"hash": hash,
"description": "",
"url": self._oConfiguration.GetConfig("urlPrefix"),
"version": self._oConfiguration.GetConfig("version"),
"content-type": self.GetContentType(SourcePath),
"encoding": self._oConfiguration.GetConfig("encoding"),
"compression": self._oConfiguration.GetConfig("compression"),
"encryption": self._oConfiguration.GetConfig("encryption"),
"describedBy": self._oConfiguration.GetConfig("describedBy"),
"landingPage": self._oConfiguration.GetConfig("landingPage"),
"attributes": lattribute
}
if None != UrlPath and UrlPath != "":
if UrlPath[len(UrlPath)-1] == "/" or UrlPath[len(UrlPath)-1] == "\\":
oResource["url"] = UrlPath.replace("\\", "/") + item
else:
oResource["url"] = UrlPath.replace("\\", "/") + "/" + item
self._Log.WriteLog("Complete file {}".format(SourcePath))
else:
self._MartiErrorId = "MRI2001"
message = "Document '{}' not found or is a folder".format(SourcePath)
self._Log.WriteLog(message + " " + self._MartiErrorId)
raise Exception(message)
return oResource
def NewMartiLQHash(self, Algorithm, FilePath, Value="", Sign=""):
try:
signed = False
if Value == "" and FilePath != "":
if not Sign is None and not Sign == "" and not os.path.exists(Sign):
self._Log.WriteLog("Sign file '{}' not found".format(Sign))
Sign = ""
if Sign is None or Sign == "":
if Algorithm == "SHA256":
sha_hash = hashlib.sha256()
with open(FilePath,"rb") as fh:
for byte_block in iter(lambda: fh.read(4096),b""):
sha_hash.update(byte_block)
Value = sha_hash.hexdigest()
if Algorithm == "SHA512":
sha_hash = hashlib.sha512()
with open(FilePath,"rb") as fh:
for byte_block in iter(lambda: fh.read(4096),b""):
sha_hash.update(byte_block)
Value = sha_hash.hexdigest()
if Algorithm == "MD5":
sha_hash = hashlib.md5()
with open(FilePath,"rb") as fh:
for byte_block in iter(lambda: fh.read(4096),b""):
sha_hash.update(byte_block)
Value = sha_hash.hexdigest()
else:
import OpenSSL
from OpenSSL import crypto
import base64
private_key_file = open(Sign, "r")
privkey = private_key_file.read()
private_key_file.close()
password = self._oConfiguration.GetConfig("sigenKey_Password")
if privkey.startswith('-----BEGIN '):
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, privkey, password.encode('utf-8'))
else:
pkey = crypto.load_pkcs12(privkey, password).get_privatekey()
with open(FilePath,"rb") as fh:
sign = OpenSSL.crypto.sign(pkey, fh.read(), Algorithm)
Value = (base64.b64encode(sign)).decode('utf-8')
signed = True
oHash = {
"algo": Algorithm,
"value": Value,
"signed": signed
}
except Exception as e:
self._Log.WriteLog("Hash error for file {}: {}".format(FilePath, str(e)))
raise e
return oHash
def NewEncryption(self, Algorithm, Value):
oEncryption = {
"algo": Algorithm,
"value": Value
}
return oEncryption