From a9a08551dd926a70e0d06ee189950f9e42881eee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wichli=C5=84ski?= Date: Thu, 29 Sep 2022 04:38:40 +0200 Subject: [PATCH 1/6] working basic cache --- cs3api4lab/api/cs3_file_api.py | 84 +++++++++++++++++++++++------- cs3api4lab/api/share_api_facade.py | 18 +++++-- cs3api4lab/api/storage_api.py | 4 ++ cs3api4lab/utils/file_utils.py | 6 +++ cs3api4lab/utils/sqlquerycache.py | 72 +++++++++++++++++++++++++ 5 files changed, 160 insertions(+), 24 deletions(-) create mode 100644 cs3api4lab/utils/sqlquerycache.py diff --git a/cs3api4lab/api/cs3_file_api.py b/cs3api4lab/api/cs3_file_api.py index cba271f7..9cc2edd8 100644 --- a/cs3api4lab/api/cs3_file_api.py +++ b/cs3api4lab/api/cs3_file_api.py @@ -14,6 +14,8 @@ import cs3.gateway.v1beta1.gateway_api_pb2_grpc as cs3gw_grpc import cs3.rpc.v1beta1.code_pb2 as cs3code import cs3.storage.provider.v1beta1.provider_api_pb2 as cs3sp +from google.protobuf.json_format import MessageToJson + from cs3api4lab.exception.exceptions import ResourceNotFoundError from cs3api4lab.utils.file_utils import FileUtils @@ -23,6 +25,7 @@ from cs3api4lab.auth.channel_connector import ChannelConnector from cs3api4lab.config.config_manager import Cs3ConfigManager from cs3api4lab.api.lock_manager import LockManager +from cs3api4lab.utils.sqlquerycache import SqlQueryCache class Cs3FileApi: @@ -31,6 +34,7 @@ class Cs3FileApi: auth = None config = None lock_manager = None + sql_cache = None def __init__(self, log): self.log = log @@ -41,9 +45,8 @@ def __init__(self, log): intercept_channel = grpc.intercept_channel(channel, auth_interceptor) self.cs3_api = cs3gw_grpc.GatewayAPIStub(intercept_channel) self.storage_api = StorageApi(log) - self.lock_manager = LockManager(log) - + self.sql_cache = SqlQueryCache() return def mount_point(self): @@ -64,27 +67,51 @@ def stat_info(self, file_path, endpoint='/'): """ time_start = time.time() stat = self.storage_api.stat(file_path, endpoint) - if stat.status.code == cs3code.CODE_OK: - time_end = time.time() - self.log.info('msg="Invoked stat" fileid="%s" elapsedTimems="%.1f"' % (file_path, (time_end - time_start) * 1000)) - return { - 'inode': {'storage_id': stat.info.id.storage_id, - 'opaque_id': stat.info.id.opaque_id}, - 'filepath': stat.info.path, - 'userid': stat.info.owner.opaque_id, - 'size': stat.info.size, - 'mtime': stat.info.mtime.seconds, - 'type': stat.info.type, - 'mime_type': stat.info.mime_type, - 'idp': stat.info.owner.idp, - 'permissions': stat.info.permission_set - } - elif stat.status.code == cs3code.CODE_NOT_FOUND: + time_end = time.time() + self.log.info( + 'msg="Invoked stat" fileid="%s" elapsedTimems="%.1f"' % (file_path, (time_end - time_start) * 1000)) + + if stat.status.code == cs3code.CODE_NOT_FOUND: self.log.info('msg="Failed stat" fileid="%s" reason="%s"' % (file_path, stat.status.message)) raise FileNotFoundError(stat.status.message + ", file " + file_path) else: - self._handle_error(stat) - + self.sql_cache.save_item( + storage_id=stat.info.id.storage_id, + opaque_id=stat.info.id.storage_id, + stored_value=MessageToJson(stat) + ) + return self._stat_output(stat) + + def stat_info_by_resource(self, opaque_id, storage_id): + """ + Stat a file and returns (size, mtime) as well as other extended info using the given userid as access token. + Note that endpoint here means the storage id. Note that fileid can be either a path (which MUST begin with /) + or an id (which MUST NOT start with a /). + """ + time_start = time.time() + + if self.sql_cache.item_exists(storage_id=storage_id, opaque_id=opaque_id): + print('from cache ') + stat = self.sql_cache.get_stored_value(storage_id=storage_id, opaque_id=opaque_id) + else: + print('from request') + stat = self.storage_api.stat_by_resource(opaque_id, storage_id) + if stat.status.code == cs3code.CODE_NOT_FOUND: + self.log.info( + 'msg="Failed stat" fileid="%s" storageid="%s" reason="%s"' % + (opaque_id, storage_id, stat.status.message) + ) + raise FileNotFoundError(stat.status.message + ", file " + stat.info.path) + else: + self.sql_cache.save_item(storage_id=storage_id, opaque_id=opaque_id, stored_value=MessageToJson(stat)) + + time_end = time.time() + self.log.info( + 'msg="Invoked stat" fileid="%s" storageid="%s" elapsedTimems="%.1f"' % ( + opaque_id, storage_id, (time_end - time_start) * 1000)) + + return self._stat_output(stat) + def read_file(self, file_path, endpoint=None): """ Read a file using the given userid as access token. @@ -254,6 +281,23 @@ def create_directory(self, path, endpoint=None): def get_home_dir(self): return self.config.home_dir if self.config.home_dir else "" + def _stat_output(self, stat): + if stat.status.code == cs3code.CODE_OK: + return { + 'inode': {'storage_id': stat.info.id.storage_id, + 'opaque_id': stat.info.id.opaque_id}, + 'filepath': stat.info.path, + 'userid': stat.info.owner.opaque_id, + 'size': stat.info.size, + 'mtime': stat.info.mtime.seconds, + 'type': stat.info.type, + 'mime_type': stat.info.mime_type, + 'idp': stat.info.owner.idp, + 'permissions': stat.info.permission_set + } + else: + self._handle_error(stat) + def _handle_error(self, response): self.log.error(response) raise Exception("Incorrect server response: " + diff --git a/cs3api4lab/api/share_api_facade.py b/cs3api4lab/api/share_api_facade.py index 75fd0850..55c22b5e 100644 --- a/cs3api4lab/api/share_api_facade.py +++ b/cs3api4lab/api/share_api_facade.py @@ -1,4 +1,5 @@ import urllib.parse +import time import cs3.ocm.provider.v1beta1.provider_api_pb2_grpc as ocm_provider_api_grpc import cs3.storage.provider.v1beta1.resources_pb2 as Resources @@ -19,6 +20,7 @@ from cs3api4lab.api.storage_api import StorageApi from cs3api4lab.exception.exceptions import OCMDisabledError + class ShareAPIFacade: def __init__(self, log): self.log = log @@ -34,6 +36,7 @@ def __init__(self, log): self.ocm_share_api = Cs3OcmShareApi(log) self.storage_api = StorageApi(log) + return def create(self, endpoint, file_path, opaque_id, idp, role=Role.EDITOR, grantee_type=Grantee.USER, reshare=True): @@ -102,12 +105,18 @@ def list_shares(self): :return: created shares and OCM shares combined and mapped to Jupyter model :rtype: dict """ + time_start = time.time() share_list = self.share_api.list() if self.config.enable_ocm: ocm_share_list = self.ocm_share_api.list() else: ocm_share_list = None - return self.map_shares(share_list, ocm_share_list) + + mapped_shares = self.map_shares(share_list, ocm_share_list) + time_end = time.time() + print('shares times:', time_end - time_start) + + return mapped_shares def list_received(self, status=None): """ @@ -205,9 +214,9 @@ def map_shares_to_model(self, list_response, received=False): share = share.share try: user = self.user_api.get_user_info(share.owner.idp, share.owner.opaque_id) - stat = self.file_api.stat_info(urllib.parse.unquote(share.resource_id.opaque_id), - share.resource_id.storage_id) # todo remove this and use storage_logic - # stat = self.storage_logic.stat_info(urllib.parse.unquote(share.resource_id.opaque_id), share.resource_id.storage_id) + # if not self.stat_cache.item_exists(share.resource_id.storage_id, share.resource_id.opaque_id): + stat = self.file_api.stat_info_by_resource(urllib.parse.unquote(share.resource_id.opaque_id), + share.resource_id.storage_id) if stat['type'] == Resources.RESOURCE_TYPE_FILE: if hasattr(share.permissions.permissions, @@ -226,6 +235,7 @@ def map_shares_to_model(self, list_response, received=False): model['writable'] = True if ShareUtils.map_permissions_to_role( share.permissions.permissions) == 'editor' else False except Exception as e: + print(e) self.log.error("Unable to map share " + share.resource_id.opaque_id + ", " + e.__str__()) continue diff --git a/cs3api4lab/api/storage_api.py b/cs3api4lab/api/storage_api.py index ea780959..6eff03b1 100644 --- a/cs3api4lab/api/storage_api.py +++ b/cs3api4lab/api/storage_api.py @@ -46,6 +46,10 @@ def stat(self, file_path, endpoint='/'): ref = FileUtils.get_reference(file_path, endpoint) return self._stat_internal(ref) + def stat_by_resource(self, opaque_id, storage_id): + ref = FileUtils.get_reference_by_resource(opaque_id=opaque_id, storage_id=storage_id) + return self._stat_internal(ref) + def _stat_internal(self, ref): return self.cs3_api.Stat(request=cs3sp.StatRequest(ref=ref), metadata=[('x-access-token', self.auth.authenticate())]) diff --git a/cs3api4lab/utils/file_utils.py b/cs3api4lab/utils/file_utils.py index 2bf6d716..aba114ce 100644 --- a/cs3api4lab/utils/file_utils.py +++ b/cs3api4lab/utils/file_utils.py @@ -18,6 +18,12 @@ def get_reference(file_id, endpoint=None): # assume we have an opaque fileid return storage_provider.Reference(resource_id=storage_provider.ResourceId(storage_id=endpoint, opaque_id=file_id)) + @staticmethod + def get_reference_by_resource(opaque_id, storage_id): + return storage_provider.Reference( + resource_id=storage_provider.ResourceId(storage_id=storage_id, opaque_id=opaque_id) + ) + @staticmethod def check_and_transform_file_path(file_id): config = Cs3ConfigManager().get_config() #note: can cause problems in tests because of the config, it should be passed as an argument diff --git a/cs3api4lab/utils/sqlquerycache.py b/cs3api4lab/utils/sqlquerycache.py new file mode 100644 index 00000000..db32ff97 --- /dev/null +++ b/cs3api4lab/utils/sqlquerycache.py @@ -0,0 +1,72 @@ +import json +import time +from datetime import datetime, timedelta +import sqlite3 + + +class SqlQueryCache: + + _cursor = None + _connection = None + + def __init__(self): + print("================== starting cache ================= ") + + @property + def connection(self): + if self._connection is None: + # Set isolation level to None to autocommit all changes to the database. + # self._connection = sqlite3.connect("./test.db", check_same_thread=False, isolation_level=None) + self._connection = sqlite3.connect(":memory:", check_same_thread=False, isolation_level=None) + self._connection.row_factory = sqlite3.Row + return self._connection + + @property + def cursor(self): + """Start a cursor and create a database called 'session'""" + if self._cursor is None: + self._cursor = self.connection.cursor() + self._cursor.execute( + """CREATE TABLE IF NOT EXISTS cached_stat + (storage_id, opaque_id, stored_value, btime)""" + ) + + return self._cursor + + def close(self): + """Close the sqlite connection""" + if self._cursor is not None: + self._cursor.close() + self._cursor = None + + def item_exists(self, storage_id, opaque_id): + """Check to see if the session of a given name exists""" + self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) + row = self.cursor.fetchone() + exists = False + print('-----------------') + + if row is not None: + btime = row['btime'] + exists = datetime.fromtimestamp(btime) + timedelta(minutes=1) > datetime.fromtimestamp(time.time()) + if not exists: + self.cursor.execute("DELETE FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) + + print('item exists:', exists) + return exists + + def save_item(self, storage_id=None, opaque_id=None, stored_value=None): + btime = datetime.timestamp(datetime.now()) + # stored_value = json.dumps(stored_value) + if not self.item_exists(storage_id, opaque_id): + self.cursor.execute( + "INSERT INTO cached_stat VALUES (?,?,?,?)", (storage_id, opaque_id, stored_value, btime) + ) + + def get_stored_value(self, storage_id, opaque_id): + self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) + row = self.cursor.fetchone() + return json.loads(row['stored_value']) if row is not None else None + + def __del__(self): + self.close() From baf9f1c9ad982814b9ee43e0622ca2f16851968a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wichli=C5=84ski?= Date: Thu, 29 Sep 2022 09:54:27 +0200 Subject: [PATCH 2/6] cache: working lib --- cs3api4lab/api/cs3_file_api.py | 17 ++-- cs3api4lab/utils/sqlquerycache.py | 147 +++++++++++++++--------------- 2 files changed, 85 insertions(+), 79 deletions(-) diff --git a/cs3api4lab/api/cs3_file_api.py b/cs3api4lab/api/cs3_file_api.py index 9cc2edd8..da8729f5 100644 --- a/cs3api4lab/api/cs3_file_api.py +++ b/cs3api4lab/api/cs3_file_api.py @@ -14,7 +14,6 @@ import cs3.gateway.v1beta1.gateway_api_pb2_grpc as cs3gw_grpc import cs3.rpc.v1beta1.code_pb2 as cs3code import cs3.storage.provider.v1beta1.provider_api_pb2 as cs3sp -from google.protobuf.json_format import MessageToJson from cs3api4lab.exception.exceptions import ResourceNotFoundError @@ -78,7 +77,7 @@ def stat_info(self, file_path, endpoint='/'): self.sql_cache.save_item( storage_id=stat.info.id.storage_id, opaque_id=stat.info.id.storage_id, - stored_value=MessageToJson(stat) + stored_value=stat ) return self._stat_output(stat) @@ -89,12 +88,16 @@ def stat_info_by_resource(self, opaque_id, storage_id): or an id (which MUST NOT start with a /). """ time_start = time.time() - + # if self.sql_cache.item_exists(storage_id=storage_id, opaque_id=opaque_id): - print('from cache ') - stat = self.sql_cache.get_stored_value(storage_id=storage_id, opaque_id=opaque_id) + print('from cache') + stat = self.sql_cache.get_stored_value( + storage_id=storage_id, + opaque_id=opaque_id, + message=cs3sp.StatResponse() + ) else: - print('from request') + print('from request ') stat = self.storage_api.stat_by_resource(opaque_id, storage_id) if stat.status.code == cs3code.CODE_NOT_FOUND: self.log.info( @@ -103,7 +106,7 @@ def stat_info_by_resource(self, opaque_id, storage_id): ) raise FileNotFoundError(stat.status.message + ", file " + stat.info.path) else: - self.sql_cache.save_item(storage_id=storage_id, opaque_id=opaque_id, stored_value=MessageToJson(stat)) + self.sql_cache.save_item(storage_id=storage_id, opaque_id=opaque_id, stored_value=stat) time_end = time.time() self.log.info( diff --git a/cs3api4lab/utils/sqlquerycache.py b/cs3api4lab/utils/sqlquerycache.py index db32ff97..712a9b71 100644 --- a/cs3api4lab/utils/sqlquerycache.py +++ b/cs3api4lab/utils/sqlquerycache.py @@ -1,72 +1,75 @@ -import json -import time -from datetime import datetime, timedelta -import sqlite3 - - -class SqlQueryCache: - - _cursor = None - _connection = None - - def __init__(self): - print("================== starting cache ================= ") - - @property - def connection(self): - if self._connection is None: - # Set isolation level to None to autocommit all changes to the database. - # self._connection = sqlite3.connect("./test.db", check_same_thread=False, isolation_level=None) - self._connection = sqlite3.connect(":memory:", check_same_thread=False, isolation_level=None) - self._connection.row_factory = sqlite3.Row - return self._connection - - @property - def cursor(self): - """Start a cursor and create a database called 'session'""" - if self._cursor is None: - self._cursor = self.connection.cursor() - self._cursor.execute( - """CREATE TABLE IF NOT EXISTS cached_stat - (storage_id, opaque_id, stored_value, btime)""" - ) - - return self._cursor - - def close(self): - """Close the sqlite connection""" - if self._cursor is not None: - self._cursor.close() - self._cursor = None - - def item_exists(self, storage_id, opaque_id): - """Check to see if the session of a given name exists""" - self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) - row = self.cursor.fetchone() - exists = False - print('-----------------') - - if row is not None: - btime = row['btime'] - exists = datetime.fromtimestamp(btime) + timedelta(minutes=1) > datetime.fromtimestamp(time.time()) - if not exists: - self.cursor.execute("DELETE FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) - - print('item exists:', exists) - return exists - - def save_item(self, storage_id=None, opaque_id=None, stored_value=None): - btime = datetime.timestamp(datetime.now()) - # stored_value = json.dumps(stored_value) - if not self.item_exists(storage_id, opaque_id): - self.cursor.execute( - "INSERT INTO cached_stat VALUES (?,?,?,?)", (storage_id, opaque_id, stored_value, btime) - ) - - def get_stored_value(self, storage_id, opaque_id): - self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) - row = self.cursor.fetchone() - return json.loads(row['stored_value']) if row is not None else None - - def __del__(self): - self.close() +from google.protobuf.json_format import MessageToJson, Parse +import time +from datetime import datetime, timedelta +import sqlite3 + + +class SqlQueryCache: + + _cursor = None + _connection = None + + def __init__(self): + print("================== starting cache ================= ") + + @property + def connection(self): + if self._connection is None: + # Set isolation level to None to autocommit all changes to the database. + self._connection = sqlite3.connect("./test.db", check_same_thread=False, isolation_level=None) + # self._connection = sqlite3.connect(":memory:", check_same_thread=False, isolation_level=None) + self._connection.row_factory = sqlite3.Row + return self._connection + + @property + def cursor(self): + """Start a cursor and create a database called 'session'""" + if self._cursor is None: + self._cursor = self.connection.cursor() + self._cursor.execute( + """CREATE TABLE IF NOT EXISTS cached_stat + (storage_id, opaque_id, stored_value, btime)""" + ) + + return self._cursor + + def close(self): + """Close the sqlite connection""" + if self._cursor is not None: + self._cursor.close() + self._cursor = None + + def item_exists(self, storage_id, opaque_id): + """Check to see if the session of a given name exists""" + self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) + row = self.cursor.fetchone() + exists = False + print('-----------------') + + if row is not None: + btime = row['btime'] + exists = datetime.fromtimestamp(btime) + timedelta(minutes=3) > datetime.fromtimestamp(time.time()) + if not exists: + self.cursor.execute("DELETE FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) + + print('item exists:', exists) + return exists + + def save_item(self, storage_id=None, opaque_id=None, stored_value=None): + btime = datetime.timestamp(datetime.now()) + stored_value = MessageToJson(stored_value) + if not self.item_exists(storage_id, opaque_id): + self.cursor.execute( + "INSERT INTO cached_stat VALUES (?,?,?,?)", (storage_id, opaque_id, stored_value, btime) + ) + + def get_stored_value(self, storage_id, opaque_id, message): + self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) + row = self.cursor.fetchone() + if row is not None: + return Parse(row['stored_value'], message) + else: + return None + + def __del__(self): + self.close() From a12bce2829b55dd9747394ec7abab93810f3889b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wichli=C5=84ski?= Date: Tue, 4 Oct 2022 14:35:53 +0200 Subject: [PATCH 3/6] cache: clean old records --- cs3api4lab/utils/sqlquerycache.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cs3api4lab/utils/sqlquerycache.py b/cs3api4lab/utils/sqlquerycache.py index 712a9b71..0928199f 100644 --- a/cs3api4lab/utils/sqlquerycache.py +++ b/cs3api4lab/utils/sqlquerycache.py @@ -63,6 +63,12 @@ def save_item(self, storage_id=None, opaque_id=None, stored_value=None): "INSERT INTO cached_stat VALUES (?,?,?,?)", (storage_id, opaque_id, stored_value, btime) ) + def clean_old_records(self): + older_than = datetime.now() + timedelta(minutes=-3) + older_than = datetime.timestamp(older_than) + self.cursor.execute("DELETE FROM cached_stat WHERE btime < ?", (older_than,)) + return + def get_stored_value(self, storage_id, opaque_id, message): self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) row = self.cursor.fetchone() @@ -72,4 +78,5 @@ def get_stored_value(self, storage_id, opaque_id, message): return None def __del__(self): + self.clean_old_records() self.close() From 2aacef235fdabe77b7124da497d8a71ba9fdbcc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wichli=C5=84ski?= Date: Tue, 4 Oct 2022 15:37:43 +0200 Subject: [PATCH 4/6] cache: clean old records --- cs3api4lab/api/cs3_file_api.py | 2 +- cs3api4lab/config/config_manager.py | 24 +++++++++++++- cs3api4lab/utils/sqlquerycache.py | 45 +++++++++++++++++--------- jupyter-config/jupyter_cs3_config.json | 5 ++- 4 files changed, 58 insertions(+), 18 deletions(-) diff --git a/cs3api4lab/api/cs3_file_api.py b/cs3api4lab/api/cs3_file_api.py index da8729f5..6aeaa62f 100644 --- a/cs3api4lab/api/cs3_file_api.py +++ b/cs3api4lab/api/cs3_file_api.py @@ -45,7 +45,7 @@ def __init__(self, log): self.cs3_api = cs3gw_grpc.GatewayAPIStub(intercept_channel) self.storage_api = StorageApi(log) self.lock_manager = LockManager(log) - self.sql_cache = SqlQueryCache() + self.sql_cache = SqlQueryCache(config=self.config) return def mount_point(self): diff --git a/cs3api4lab/config/config_manager.py b/cs3api4lab/config/config_manager.py index 7f6ea7cf..a4f1f201 100644 --- a/cs3api4lab/config/config_manager.py +++ b/cs3api4lab/config/config_manager.py @@ -80,6 +80,15 @@ class Config(LoggingConfigurable): oauth_token = Unicode( config=True, allow_none=True, help="""OAuth token""" ) + stat_cache_enabled = Bool( + config=True, default_value=False, help="""Stat caching is enabled""" + ) + stat_cache_file = Unicode( + config=True, default_value="./tmp_cache_file.db", allow_none=True, help="""Path to db file""" + ) + stat_cache_time = CInt( + config=True, default_value=180, allow_none=True, help="""Cache ttl in seconds""" + ) @default("reva_host") def _reva_host_default(self): @@ -180,6 +189,18 @@ def _oauth_file_default(self): def _oauth_token_default(self): return self._get_config_value("oauth_token") + @default("stat_cache_enabled") + def _stat_cache_enabled_default(self): + return self._get_config_value("stat_cache_enabled") in ["true", True] + + @default("stat_cache_file") + def _stat_cache_file_default(self): + return self._get_config_value("stat_cache_file") + + @default("stat_cache_time") + def _stat_cache_time_default(self): + return self._get_config_value("stat_cache_time") + def _get_config_value(self, key): env = os.getenv("CS3_" + key.upper()) if env: @@ -235,7 +256,8 @@ def _file_config(self, key): "eos_file": None, "eos_token": None, "oauth_file": None, - "oauth_token": None + "oauth_token": None, + "stat_cache_enabled": False, } diff --git a/cs3api4lab/utils/sqlquerycache.py b/cs3api4lab/utils/sqlquerycache.py index 0928199f..402f558d 100644 --- a/cs3api4lab/utils/sqlquerycache.py +++ b/cs3api4lab/utils/sqlquerycache.py @@ -5,19 +5,21 @@ class SqlQueryCache: - _cursor = None _connection = None + config = None - def __init__(self): - print("================== starting cache ================= ") + def __init__(self, config): + self.config = config @property def connection(self): if self._connection is None: - # Set isolation level to None to autocommit all changes to the database. - self._connection = sqlite3.connect("./test.db", check_same_thread=False, isolation_level=None) - # self._connection = sqlite3.connect(":memory:", check_same_thread=False, isolation_level=None) + self._connection = sqlite3.connect( + self.config.stat_cache_file, + check_same_thread=False, + isolation_level=None # Set isolation level to None to autocommit all changes to the database. + ) self._connection.row_factory = sqlite3.Row return self._connection @@ -28,10 +30,10 @@ def cursor(self): self._cursor = self.connection.cursor() self._cursor.execute( """CREATE TABLE IF NOT EXISTS cached_stat - (storage_id, opaque_id, stored_value, btime)""" + (storage_id, opaque_id, stored_value, ctime)""" ) - return self._cursor + return self._cursor if self.config.stat_cache_enabled else None def close(self): """Close the sqlite connection""" @@ -40,6 +42,9 @@ def close(self): self._cursor = None def item_exists(self, storage_id, opaque_id): + if not self.config.stat_cache_enabled: + return False + """Check to see if the session of a given name exists""" self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) row = self.cursor.fetchone() @@ -47,29 +52,39 @@ def item_exists(self, storage_id, opaque_id): print('-----------------') if row is not None: - btime = row['btime'] - exists = datetime.fromtimestamp(btime) + timedelta(minutes=3) > datetime.fromtimestamp(time.time()) + expiration_time = datetime.fromtimestamp(row['ctime']) + timedelta(seconds=self.config.stat_cache_time) + exists = expiration_time > datetime.fromtimestamp(time.time()) if not exists: - self.cursor.execute("DELETE FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) + self.cursor.execute("DELETE FROM cached_stat WHERE storage_id=? AND opaque_id=?", + (storage_id, opaque_id)) print('item exists:', exists) return exists def save_item(self, storage_id=None, opaque_id=None, stored_value=None): - btime = datetime.timestamp(datetime.now()) + if not self.config.stat_cache_enabled: + return False + + ctime = datetime.timestamp(datetime.now()) stored_value = MessageToJson(stored_value) if not self.item_exists(storage_id, opaque_id): self.cursor.execute( - "INSERT INTO cached_stat VALUES (?,?,?,?)", (storage_id, opaque_id, stored_value, btime) + "INSERT INTO cached_stat VALUES (?,?,?,?)", (storage_id, opaque_id, stored_value, ctime) ) def clean_old_records(self): - older_than = datetime.now() + timedelta(minutes=-3) + if not self.config.stat_cache_enabled: + return False + + older_than = datetime.now() + timedelta(seconds=-self.config.stat_cache_time) older_than = datetime.timestamp(older_than) - self.cursor.execute("DELETE FROM cached_stat WHERE btime < ?", (older_than,)) + self.cursor.execute("DELETE FROM cached_stat WHERE ctime < ?", (older_than,)) return def get_stored_value(self, storage_id, opaque_id, message): + if not self.config.stat_cache_enabled: + return None + self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) row = self.cursor.fetchone() if row is not None: diff --git a/jupyter-config/jupyter_cs3_config.json b/jupyter-config/jupyter_cs3_config.json index 7172bc24..d6aa0d19 100644 --- a/jupyter-config/jupyter_cs3_config.json +++ b/jupyter-config/jupyter_cs3_config.json @@ -16,6 +16,9 @@ "locks_expiration_time": 150, "tus_enabled": false, "enable_ocm": false, - "shared_folder": "MyShares" + "shared_folder": "MyShares", + "stat_cache_enabled": true, + "stat_cache_file": "./test.db", + "stat_cache_time": 180 } } From 4b614eee0bf8c4268cf6f7bef974daff8876f947 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wichli=C5=84ski?= Date: Tue, 4 Oct 2022 18:38:24 +0200 Subject: [PATCH 5/6] fix tests --- cs3api4lab/api/cs3_file_api.py | 2 -- cs3api4lab/tests/extensions.py | 3 +++ .../jupyter-config/jupyter_cs3_config.json | 3 ++- cs3api4lab/tests/share_test_base.py | 22 ++++++++++--------- cs3api4lab/utils/sqlquerycache.py | 5 ++++- 5 files changed, 21 insertions(+), 14 deletions(-) diff --git a/cs3api4lab/api/cs3_file_api.py b/cs3api4lab/api/cs3_file_api.py index 6aeaa62f..fac74bd5 100644 --- a/cs3api4lab/api/cs3_file_api.py +++ b/cs3api4lab/api/cs3_file_api.py @@ -90,14 +90,12 @@ def stat_info_by_resource(self, opaque_id, storage_id): time_start = time.time() # if self.sql_cache.item_exists(storage_id=storage_id, opaque_id=opaque_id): - print('from cache') stat = self.sql_cache.get_stored_value( storage_id=storage_id, opaque_id=opaque_id, message=cs3sp.StatResponse() ) else: - print('from request ') stat = self.storage_api.stat_by_resource(opaque_id, storage_id) if stat.status.code == cs3code.CODE_NOT_FOUND: self.log.info( diff --git a/cs3api4lab/tests/extensions.py b/cs3api4lab/tests/extensions.py index 16bb4aa4..917ff1d6 100644 --- a/cs3api4lab/tests/extensions.py +++ b/cs3api4lab/tests/extensions.py @@ -22,6 +22,8 @@ import cs3.sharing.ocm.v1beta1.ocm_api_pb2_grpc as ocm_api_grpc import cs3.gateway.v1beta1.gateway_api_pb2_grpc as grpc_gateway +from utils.sqlquerycache import SqlQueryCache + class ExtStorageApi(StorageApi): def __init__(self, log, config): @@ -76,6 +78,7 @@ def __init__(self, log, config) -> None: self.cs3_api = cs3gw_grpc.GatewayAPIStub(intercept_channel) self.lock_manager = ExtLockManager(log, config) self.storage_api = ExtStorageApi(log, config) + self.sql_cache = SqlQueryCache(config=self.config) class ExtCs3ShareApi(Cs3ShareApi): diff --git a/cs3api4lab/tests/jupyter-config/jupyter_cs3_config.json b/cs3api4lab/tests/jupyter-config/jupyter_cs3_config.json index 5ca9e15b..528d5c19 100644 --- a/cs3api4lab/tests/jupyter-config/jupyter_cs3_config.json +++ b/cs3api4lab/tests/jupyter-config/jupyter_cs3_config.json @@ -16,6 +16,7 @@ "locks_expiration_time": 10, "tus_enabled": false, "enable_ocm": false, - "shared_folder": "MyShares" + "shared_folder": "MyShares", + "stat_cache_enabled": false } } diff --git a/cs3api4lab/tests/share_test_base.py b/cs3api4lab/tests/share_test_base.py index 45d19b77..4f4cfa21 100644 --- a/cs3api4lab/tests/share_test_base.py +++ b/cs3api4lab/tests/share_test_base.py @@ -5,6 +5,7 @@ import cs3.rpc.v1beta1.code_pb2 as cs3code from collections import namedtuple + class ShareTestBase: storage_id = '123e4567-e89b-12d3-a456-426655440000' receiver_role = 'editor' @@ -34,10 +35,11 @@ def setUp(self): "authenticator_class": "cs3api4lab.auth.RevaPassword", "client_id": "marie", "client_secret": "radioactivity", - "locks_expiration_time": 10, - "tus_enabled": True, - "enable_ocm": False - } + "locks_expiration_time": 10, + "tus_enabled": True, + "enable_ocm": False, + "stat_cache_enabled": False + } marie_ext_config = namedtuple('MarieConfig', marie_ext_config)(**marie_ext_config) richard_local_config = { @@ -54,9 +56,10 @@ def setUp(self): "authenticator_class": "cs3api4lab.auth.RevaPassword", "client_id": "richard", "client_secret": "superfluidity", - "locks_expiration_time": 10, - "tus_enabled": True, - "enable_ocm": False + "locks_expiration_time": 10, + "tus_enabled": True, + "enable_ocm": False, + "stat_cache_enabled": False, } richard_local_config = namedtuple('richardConfig', richard_local_config)(**richard_local_config) @@ -152,7 +155,6 @@ def clear_locks_on_file(self, file, endpoint='/'): for lock in list(metadata.keys()): self.storage_api.set_metadata({lock: "{}"}, file, endpoint) - def remove_test_share(self, user, share_id): if user == 'einstein': self.share_api.remove(share_id) @@ -228,9 +230,9 @@ def remove_share_and_file_by_path(self, user, file_path): stat = storage.stat(file_path) if stat.status.code == cs3code.CODE_NOT_FOUND or stat.status.code == cs3code.CODE_INTERNAL: self.create_test_file(user, file_path) - #todo the code above won't be necessary after https://github.com/cs3org/reva/issues/2847 is fixed + # todo the code above won't be necessary after https://github.com/cs3org/reva/issues/2847 is fixed - shares = share_api.list_shares_for_filepath(file_path) #todo this won't work on CERNBOX + shares = share_api.list_shares_for_filepath(file_path) # todo this won't work on CERNBOX if shares: for share in shares: share_api.remove(share['opaque_id']) diff --git a/cs3api4lab/utils/sqlquerycache.py b/cs3api4lab/utils/sqlquerycache.py index 402f558d..df2dcfb1 100644 --- a/cs3api4lab/utils/sqlquerycache.py +++ b/cs3api4lab/utils/sqlquerycache.py @@ -25,6 +25,9 @@ def connection(self): @property def cursor(self): + if not self.config.stat_cache_enabled: + return None + """Start a cursor and create a database called 'session'""" if self._cursor is None: self._cursor = self.connection.cursor() @@ -33,7 +36,7 @@ def cursor(self): (storage_id, opaque_id, stored_value, ctime)""" ) - return self._cursor if self.config.stat_cache_enabled else None + return self._cursor def close(self): """Close the sqlite connection""" From ec36186391b850445ca87576b9192083d9b2ec66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wichli=C5=84ski?= Date: Tue, 4 Oct 2022 18:48:27 +0200 Subject: [PATCH 6/6] fix tests --- cs3api4lab/tests/extensions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cs3api4lab/tests/extensions.py b/cs3api4lab/tests/extensions.py index 917ff1d6..35af4312 100644 --- a/cs3api4lab/tests/extensions.py +++ b/cs3api4lab/tests/extensions.py @@ -22,7 +22,7 @@ import cs3.sharing.ocm.v1beta1.ocm_api_pb2_grpc as ocm_api_grpc import cs3.gateway.v1beta1.gateway_api_pb2_grpc as grpc_gateway -from utils.sqlquerycache import SqlQueryCache +from cs3api4lab.utils.sqlquerycache import SqlQueryCache class ExtStorageApi(StorageApi):