How to Implement a Custom Memory & Redis Cache for an API Gateway.

Justice NDOU
8 min readApr 8, 2023

--

Introduction

The reason i created my own caching library was so that I can be able to be in control of the caching mechanism for my API GateWay.

Caching is a crucial component of a high-performance API Gateway, which is an important tool for modern-day developers. In this article, we will discuss how to implement a Python-based cache for an API Gateway using asyncio, threading, Redis, and Python.

Our implementation will provide thread-safe caching and automatic cache eviction when full, with the added benefit of being able to use Redis as a cache storage system. We will also discuss why this implementation was a great learning experience and probably the best Python-based cache implementation for a Python-based Gateway.

What is caching and why is it important for an API Gateway?

Caching is the process of storing frequently accessed data in memory to improve application performance. When a request is made to an API, it takes time to process and return a response. This time is known as the response time or latency. Caching can help reduce response time by retrieving data from memory instead of querying it from a database or a service.

API Gateways act as intermediaries between clients and services, providing security, scalability, and performance benefits. They can also cache responses to improve performance and reduce the load on services.

Now that we know why caching is important for an API Gateway, let’s start implementing a Python-based cache.

Implementing a Python-based Cache for an API Gateway

Our implementation of a Python-based cache for an API Gateway uses asyncio, threading, Redis, and Python. We will start by defining a class called Cache.

Version 0.0.1 of this implementation can be found on Github

EOD Stock API — API Gateway

Custom Cache Class

class Cache:
"""
A class to handle caching of data, both in-memory and in Redis.
The class provides thread-safe caching and automatic cache eviction when full.

:param max_size: The maximum size of the in-memory cache. The default is set in the config file.
:param expiration_time: The expiration time of each cache entry, in seconds. The default is set in the config
file.
:param use_redis: Indicates whether to use a Redis cache. The default is False.
"""

def __init__(self, cache_name: str = "mem", max_size: int = MEM_CACHE_SIZE, expiration_time: int = EXPIRATION_TIME,
use_redis: bool = False):
"""
Initializes the cache and creates a redis client if use_redis=True.
IF Redis fails the cache will fall back to using in Memory Cache
"""
self.max_size = max_size
self.expiration_time = expiration_time
self._cache_name = cache_name
self._cache = {}
self._lock = threading.Lock()
self._use_redis = use_redis
self._logger = init_logger(camel_to_snake(self.__class__.__name__))

Our custom cache class will be designed to handle caching of data, both in-memory and in Redis. It will provide thread-safe caching and automatic cache eviction when full. The class will have the following parameters:

  • max_size: The maximum size of the in-memory cache. The default is set in the config file.
  • expiration_time: The expiration time of each cache entry, in seconds. The default is set in the config file.
  • use_redis: Indicates whether to use a Redis cache. The default is False.

The constructor of the Cache class initializes the cache and creates a Redis client if use_redis=True.

If Redis fails, the cache will fall back to using in-memory cache. This ensures that our cache is always available even if there are issues with the Redis server.

See Code Below

    def __init__(self, cache_name: str = "mem", max_size: int = MEM_CACHE_SIZE, expiration_time: int = EXPIRATION_TIME,
use_redis: bool = False):
"""
Initializes the cache and creates a redis client if use_redis=True.
IF Redis fails the cache will fall back to using in Memory Cache
"""
...

if self._use_redis:
redis_host = config_instance().REDIS_CACHE.CACHE_REDIS_HOST
redis_port = config_instance().REDIS_CACHE.CACHE_REDIS_PORT
password = config_instance().REDIS_CACHE.REDIS_PASSWORD
try:
self._redis_client = redis.StrictRedis(host=redis_host, port=redis_port, password=password)
self._redis_client.ping()
config_instance().DEBUG and self._logger.info("Cache -- Redis connected")
except (ConnectionError, AuthenticationError):
config_instance().DEBUG and self._logger.error(msg="Redis failed to connect....")
self.turn_off_redis()

as you can see the self._redis_client.ping() statement will cause the redis cache to throw a ConnectionError if a connetion cannot be made this way
the cache will turn_off_redis cache using the following method

self.turn_off_redis()

as you can see turn_off_redis simply set the _use_redis flag to False.

CACHE SET METHOD
The Main and Most Important Method of this implementation is the set_method and its defined as follows

The set method is responsible for storing the cached value. It takes in three parameters, key, value, and ttl. The key parameter is a unique identifier for the cached value, while the value parameter is the actual value that needs to be cached. The ttl parameter, which is optional, specifies the time-to-live of the cached value in seconds. If the ttl parameter is not provided, the default expiration time of the cache is used.

Before storing the value, it is first serialized using the _serialize_value method, which ensures that the value can be stored as a string. The method then checks if the cache is at maximum capacity. If the cache has reached its maximum size, the _remove_oldest_entry method is called to remove the oldest entry from the cache.

Next, the method attempts to store the value in Redis if the use_redis parameter is set to True. If Redis fails to connect or store the value, the method fails silently, and the value is not stored. If Redis is not being used, the value is stored in-memory.

If storing the value in-memory fails, the method again fails silently. Finally, the method returns None to indicate that the value has been stored successfully.

In conclusion, the set method of our custom Cache class provides an easy way to store cached values either in-memory or in Redis. It automatically handles the serialization of the value, setting the expiration time, and eviction of the oldest entries when the cache is full. By using this method, developers can easily add caching functionality to their applications and improve performance.

This method is accompanied by the set_memcache method which when combined with the set methods completes the caching mechanism employed in our gateway.

Definition of set_memcache Method is as follows.

    async def _set_mem_cache(self, key: str, value: Any, ttl: int = 0):
"""
**_set_mem_cache**
private method never call this code directly
:param key:
:param value:
:param ttl:
:return:
"""
with self._cache_lock:
# If the cache is full, remove the oldest entry
if len(self._cache) >= self.max_size:
await self._remove_oldest_entry()

# creates a mem_cache item and set the timestamp and time to live based on given value or default
self._cache[key] = {'value': value, 'timestamp': time.monotonic(), 'ttl': ttl}

Note that this method is always called by the set cache method and is not to be used directly as it will break the caching mechanism.

Now the Last Method and second most important is the get Cache Method.

Get Cache Method.

    # 1 second timeout default
async def get(self, key: str, timeout=1) -> Any:
"""
*GET*
NOTE This method will time-out in 1 second if no value is returned from any cache
Retrieve the value associated with the given key within the allocated time.
If use_redis=True the value is retrieved from Redis, only if that key is not also on local memory.

:param timeout: timeout in seconds, if time expires the method will return None
:param key = a key used to find the value to search for.
"""

async def _async_redis_get(get: Callable, _key: str):
"""async stub to fetch data from redis"""
return get(_key)

try:
# Wait for the result of the memcache lookup with a timeout
value = await asyncio.wait_for(self._get_memcache(key=key), timeout=timeout)
except (asyncio.TimeoutError, KeyError):
# Timed out waiting for the memcache lookup, or KeyError - as a result of cache eviction
value = None

# will only try and return a value in redis if memcache value does not exist
if self._use_redis and (value is None):
try:
# Wait for the result of the redis lookup with a timeout
redis_get = functools.partial(_async_redis_get, get=self._redis_client.get)
value = await asyncio.wait_for(redis_get(_key=key), timeout=timeout)
except (redis.exceptions.TimeoutError, asyncio.TimeoutError):
# Timed out waiting for the redis lookup
config_instance().DEBUG and self._logger.error("Timeout Error Reading from redis")
value = None
except redis.exceptions.ConnectionError:
config_instance().DEBUG and self._logger.error("ConnectionError Reading from redis")
value = None

return await self._deserialize_value(value, value) if value else None

This method has a default timeout of 1 second, which means that if the value is not retrieved within 1 second from any of the cache stores, the method will return None.

The get method first attempts to retrieve the value from the in-memory cache, and if that fails, it attempts to retrieve it from Redis (if _use_redis is set to True).

The _async_redis_get function is a helper function used to fetch data from Redis asynchronously.

If the value is successfully retrieved, the _deserialize_value function is called to deserialize the value and return it.

In conclusion, the get method is an essential part of the cache class that enables retrieving values from the cache. By setting a timeout, we can ensure that the method doesn't block indefinitely, making it more efficient.

This means our gateway is guaranteed to return or can be guaranteed to return within some reasonable time depending on the resource which is being accessed.

Conclusion.

In conclusion, implementing a caching system in our gateway has proven to be essential for ensuring that the performance of our system remains optimal.

It helps to minimize the latency and the response time, especially when processing requests that have previously been cached. Also, having the option to store cached data either in-memory or using a distributed cache system such as Redis gives us the flexibility to choose a caching mechanism that best suits our needs.

We believe that our implementation of this caching system is robust and efficient, as it has been thoroughly tested and optimized for performance. With this feature, our gateway is better equipped to handle high-volume requests without compromising the speed and responsiveness of the system.

If you’re interested in exploring our implementation of this caching system, you can check out the repository on GitHub. Additionally, we also have a Stock Market API that our gateway is a part of. You can find more information about it on our website or our GitHub repository.

In conclusion, the get method is an essential part of the cache class that enables retrieving values from the cache. By setting a timeout, we can ensure that the method doesn't block indefinitely, making it more efficient.

--

--