Speedup the Pipeline in Redis-py
pipelines is a way to speedup the execution of
redis commands when you use
redis-py library in
But what if the
pipeline is not able to finish commands in time either?
redis server accepts commands from clients and executes them. Then it puts the response in a queue and waits for the clients to fetch.
So a proper client should read the response once it sends commands.
That's why a
pipeline is still slow: it not only sends commands, but also waits and reads the response and parse it. And the latter procedure costs 60% time.
Solution 1: Hiredis
hiredis is a parser written in
C, which is extremely fast and it reduces 50% time the whole pipeline costs.
The Installation is easy:
pip install hiredis
Nothing else needs to be done. The
redis-py will automatically use
hiredis as the response parser if it's installed.
Solution 2: Ignore the response
This is nearly a black magic. If you need to keep it transactional or you want to get the response, do not use this method.
pipeline = redis_client.pipeline(transaction=False) # several pipeline commands: bla bla bla pipeline.command_stack = iter(pipeline.command_stack) pipeline.execute()
But remember, the response never disappears but stays in the response queue in the server. So if you need to execute a query command later, such as
HGET, you must first disconnect the client to force the server to clean the cache and then reconnect to the server to query. Otherwise you'll get the queued response responding the pipeline commands.
Even if there's nothing to query, you have to reconnect
redis to release the memory used by the cached responses. Otherwise the used memory of the server will keep increasing till crashing.
Here's the way to reconnect:
redis_client.connection_pool.disconnect() redis_client = get_redis_client()
So you may use a diy client like this:
class RedisClient(object): def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs self.rd = redis.Redis(*args, **kwargs) self.rd4pp = None self.default_sock_stack = 0 self.sock_stack = self.default_sock_stack def __getattr__(self, attr): return self.rd.__getattribute__(attr) def connect(self): try: self.rd4pp.connection_pool.disconnect() except: pass self.rd4pp = redis.Redis(*self.args, **self.kwargs) self.sock_stack = self.default_sock_stack def pipeline(self, transaction=False): if transaction: return self.rd.pipeline(transaction=True) pp = self.rd4pp.pipeline(transaction=False) exe = pp.execute def execute(): if self.sock_stack <= 0: self.connect() pp.command_stack = iter(pp.command_stack) exe() self.sock_stack -= 1 return  pp.execute = execute return pp
Solution 3: Use the protocol, Luke
Take look at this article Redis Mass Insertion from
redis.io first and then you'll know we can simply use
socket to do mass insertion at a holy high speed: at least 3 times faster than Solution 2.
def connect(self, host='localhost', port=6379, db=0, password=None, *args, **kwargs): try: self.rd4pp.close() except: pass self.rd4pp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.rd4pp.connect((host, port)) if password: self.rd4pp.send( '*2\r\n$4\r\nAUTH\r\n$%s\r\n%s\r\n' % (len(password), password)) if db: self.rd4pp.send( '*2\r\n$6\r\nSELECT\r\n$%s\r\n%s\r\n' % (len(db), db)) self.sock_stack = self.default_sock_stack def pipeline(self, transaction=False): pp = self.rd.pipeline(transaction=transaction) if transaction: return pp exe = pp.execute def execute(): if self.sock_stack < 0: self.connect(*self.args, **self.kwargs) self.rd4pp.send(''.join( '*%s\r\n' % len(cmd) + ''.join(['$%s\r\n%s\r\n' % (len(str(w)), w) for w in cmd]) for cmd, _ in pp.command_stack)) pp.command_stack =  self.sock_stack -= 1 return  pp.execute = execute return pp
Page Not Found
Try to search through the entire repo.