note-online/SQLEasy.py
2021-04-22 19:00:03 +03:00

228 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
class SQLiteEasyException(Exception):
pass
def compareKey(DBlist, key, type_of_key=lambda x: x, hideIndex=True):
if not(type(DBlist) is list):
raise SQLiteEasyException(f"function compareKey need List object, unsupported type: {type(DBlist)}")
if len(DBlist) > 0:
if key not in DBlist[0]:
raise SQLiteEasyException(f"key '{key}' not founded.")
DB_Dictonary = dict()
for BD in sorted(DBlist, key=lambda List: List[key]):
comparedBD = dict()
for Key in BD:
if not(Key == key) and hideIndex:
comparedBD[Key] = BD[Key]
elif not hideIndex:
comparedBD[Key] = BD[Key]
DB_Dictonary[type_of_key(BD[key])] = comparedBD
return DB_Dictonary
def autoselectID_fromNew_item(DATABASE, TABLE, ID_KEY):
TABLE_DATA = DATABASE.getBase(TABLE)
TABLE_DATA = compareKey(TABLE_DATA, ID_KEY)
TABLE_DATA = [ID for ID in TABLE_DATA]
INDEX = 0
for DATA_INDEX in TABLE_DATA:
if INDEX == DATA_INDEX:
INDEX += 1
else:
return INDEX
return INDEX
class database:
def dict_factory(self, cursor, row):
dictDB = {}
for idx, col in enumerate(cursor.description):
dictDB[col[0]] = row[idx]
return dictDB
def encodeSQLiteType(self, objectum, all_as_str=False):
if type(objectum) is str:
return "'%s'" % objectum.replace("'", "[SQLEasy_symbol:']")
elif objectum is None and all_as_str:
return "''"
elif objectum is None:
return 'NULL'
elif type(objectum) is bool and objectum and all_as_str:
return "'TRUE'"
elif type(objectum) is bool and objectum:
return 'TRUE'
elif type(objectum) is bool and not(objectum) and all_as_str:
return "'FALSE'"
elif type(objectum) is bool and not(objectum):
return 'FALSE'
elif all_as_str and True in [type(objectum) is int, type(objectum) is float]:
return "'%s'" % objectum
elif not(all_as_str) and True in [type(objectum) is int, type(objectum) is float]:
return objectum
else:
raise SQLiteEasyException(f"Unsupported type: {type(objectum)}")
def __init__(self, PATH, DatabaseName=None):
self.ConnectedFile = sqlite3.connect(PATH, check_same_thread=False)
self.databaseChoosed = DatabaseName
self.ConnectedFile.row_factory = self.dict_factory
self.act_commit = True
def toggleCommit(self, value=None):
if value in (False, True):
self.act_commit = value
else:
if self.act_commit:
self.act_commit = False
else:
self.act_commit = True
def commit(self):
self.ConnectedFile.commit()
def getBase(self, DatabaseName=None, elementsFromDB='*'):
elementsFromDB = str(elementsFromDB)
if DatabaseName is None and self.databaseChoosed is None:
raise SQLiteEasyException("Database is not choosed")
elif DatabaseName is None and not(self.databaseChoosed is None):
dbCursore = self.ConnectedFile.cursor()
dbCursore.execute(f"select {elementsFromDB} from {self.databaseChoosed}")
result = dbCursore.fetchall()
else:
dbCursore = self.ConnectedFile.cursor()
dbCursore.execute(f"select {elementsFromDB} from {DatabaseName}")
self.databaseChoosed = DatabaseName
result = dbCursore.fetchall()
for rowID in range(len(result)):
row = result[rowID]
for cellKey in row:
if type(row[cellKey]) is str:
result[rowID][cellKey] = row[cellKey].replace('[SQLEasy_symbol:']', "'")
return result
def pop(self, key, value, DatabaseName=None):
if type(value) is str:
# value = f"'%s'" % value.replace('\'', '\\\'')
value = f"'%s'" % value.replace("'", '[SQLEasy_symbol:']')
if DatabaseName is None and not(self.databaseChoosed is None):
DatabaseName = self.databaseChoosed
elif DatabaseName is None:
raise SQLiteEasyException("Database is not choosed")
dbCursore = self.ConnectedFile.cursor()
dbCursore.execute('DELETE FROM %s\n\nWHERE %s == %s;' % (DatabaseName, key, value))
if self.act_commit:
self.ConnectedFile.commit()
def setItem(self, key, newValue, indexKey, value, DatabaseName=None):
newValue = self.encodeSQLiteType(newValue)
value = self.encodeSQLiteType(value)
if DatabaseName is None and not(self.databaseChoosed is None):
DatabaseName = self.databaseChoosed
elif DatabaseName is None:
raise SQLiteEasyException("Database is not choosed")
dbCursore = self.ConnectedFile.cursor()
if value is None:
dbCursore.execute('UPDATE %s SET %s = %s WHERE %s is NULL;' % (DatabaseName, key, newValue, indexKey))
else:
dbCursore.execute('UPDATE %s SET %s = %s WHERE %s = %s;' % (DatabaseName, key, newValue, indexKey, value))
if self.act_commit:
self.ConnectedFile.commit()
def add(self, values, DatabaseName=None):
if DatabaseName is None and not(self.databaseChoosed is None):
DatabaseName = self.databaseChoosed
elif DatabaseName is None:
raise SQLiteEasyException("Database is not choosed")
if DatabaseName is None and not(self.databaseChoosed is None):
DatabaseName = self.databaseChoosed
elif DatabaseName is None:
raise SQLiteEasyException("Database is not choosed")
keys = [str(key) for key in values]
values = [self.encodeSQLiteType(values[key], all_as_str=True) for key in values]
dbCursore = self.ConnectedFile.cursor()
try:
dbCursore.execute('INSERT INTO %s (%s) VALUES (%s)' % (DatabaseName, ', '.join(keys), ', '.join(values)))
except:
dbCursore.execute('INSERT OR IGNORE INTO %s (%s) VALUES (%s)' % (DatabaseName, ', '.join(keys), ', '.join(values)))
if self.act_commit:
self.ConnectedFile.commit()
def uploadFiles(self, binary, DatabaseName=None): # Uwaga! Может работать с ошибками.
if not(type(binary) is bytes or type(binary) is bytearray):
raise SQLiteEasyException('You can upload only byte or bytearray types!!')
dbCursore = self.ConnectedFile.cursor()
binary = sqlite3.Binary(binary)
if DatabaseName is None and not(self.databaseChoosed is None):
DatabaseName = self.databaseChoosed
elif DatabaseName is None:
raise SQLiteEasyException("Database is not choosed")
if DatabaseName is None and not(self.databaseChoosed is None):
DatabaseName = self.databaseChoosed
elif DatabaseName is None:
raise SQLiteEasyException("Database is not choosed")
# (!) Дописать
def chooseDataBase(self, DatabaseName):
self.getBase(DatabaseName)
self.databaseChoosed = DatabaseName
def currentIndex(self, key, value, DatabaseName=None):
if DatabaseName is None and not(self.databaseChoosed is None):
DatabaseName = self.databaseChoosed
elif DatabaseName is None:
raise SQLiteEasyException("Database is not choosed")
DB = self.getBase(DatabaseName)
ID = 0
for Dictonary in sorted(DB, key=lambda dictonary: dictonary[key]):
if Dictonary[key] == value:
return ID
ID += 1
def currentValue(self, key, value, DatabaseName=None):
if DatabaseName is None and not(self.databaseChoosed is None):
DatabaseName = self.databaseChoosed
elif DatabaseName is None:
raise SQLiteEasyException("Database is not choosed")
DB = self.getBase(DatabaseName)
for Dictonary in sorted(DB, key=lambda dictonary: dictonary[key]):
if Dictonary[key] == value:
return Dictonary
def createColumn(self, columnName, tableName, DatabaseName=None):
return # (!) Не реализовано!!
def getTables(self):
dbCursore = self.ConnectedFile.cursor()
dbCursore.execute('SELECT name from sqlite_master where type= "table"')
return [item['name'] for item in dbCursore.fetchall()]
def getDict(self):
dictonary = dict()
tables = self.getTables()
for table in tables:
dictonary[table] = self.getBase(table)
return dictonary
def execute(self, req):
dbCursore = self.ConnectedFile.cursor()
dbCursore.execute(req)
if self.act_commit:
self.ConnectedFile.commit()
return dbCursore.fetchall()
def formingTable(dictonary, path):
pass # В разработке