Speedup the Pipeline in Redis-py

Using pipelines is a way to speedup the execution of redis commands when you use redis-py library in python script.

But what if the pipeline is not able to finish commands in time either?

Internally, the redis server accepts commands from clients and executes them. Then it puts the response in a queue and waits for the clients to fetch.

So a proper client should read the response once it sends commands.

That's why a pipeline is still slow: it not only sends commands, but also waits and reads the response and parse it. And the latter procedure costs 60% time.

Solution 1: Hiredis

hiredis is a parser written in C, which is extremely fast and it reduces 50% time the whole pipeline costs.

The Installation is easy:

pip install hiredis

Nothing else needs to be done. The redis-py will automatically use hiredis as the response parser if it's installed.

Solution 2: Ignore the response

This is nearly a black magic. If you need to keep it transactional or you want to get the response, do not use this method.

pipeline = redis_client.pipeline(transaction=False)
# several pipeline commands: bla bla bla
pipeline.command_stack = iter(pipeline.command_stack)
pipeline.execute()

But remember, the response never disappears but stays in the response queue in the server. So if you need to execute a query command later, such as HGET, you must first disconnect the client to force the server to clean the cache and then reconnect to the server to query. Otherwise you'll get the queued response responding the pipeline commands.

Even if there's nothing to query, you have to reconnect redis to release the memory used by the cached responses. Otherwise the used memory of the server will keep increasing till crashing.

Here's the way to reconnect:

redis_client.connection_pool.disconnect()
redis_client = get_redis_client()

So you may use a diy client like this:

class RedisClient(object):
    def __init__(self, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
        self.rd = redis.Redis(*args, **kwargs)
        self.rd4pp = None
        self.default_sock_stack = 0
        self.sock_stack = self.default_sock_stack

    def __getattr__(self, attr):
        return self.rd.__getattribute__(attr)
    
    def connect(self):
        try:
            self.rd4pp.connection_pool.disconnect()
        except:
            pass
        self.rd4pp = redis.Redis(*self.args, **self.kwargs)
        self.sock_stack = self.default_sock_stack

    def pipeline(self, transaction=False):
        if transaction:
            return self.rd.pipeline(transaction=True)
            
        pp = self.rd4pp.pipeline(transaction=False)
        exe = pp.execute

        def execute():
            if self.sock_stack <= 0:
                self.connect()
            pp.command_stack = iter(pp.command_stack)
            exe()
            self.sock_stack -= 1
            return []
            
        pp.execute = execute
        return pp

Solution 3: Use the protocol, Luke

Take look at this article Redis Mass Insertion from redis.io first and then you'll know we can simply use socket to do mass insertion at a holy high speed: at least 3 times faster than Solution 2.

def connect(self, host='localhost', port=6379, db=0, password=None,
            *args, **kwargs):
    try:
        self.rd4pp.close()
    except:
        pass
    self.rd4pp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.rd4pp.connect((host, port))
    if password:
        self.rd4pp.send(
            '*2\r\n$4\r\nAUTH\r\n$%s\r\n%s\r\n'
            % (len(password), password))
    if db:
        self.rd4pp.send(
            '*2\r\n$6\r\nSELECT\r\n$%s\r\n%s\r\n' % (len(db), db))
    self.sock_stack = self.default_sock_stack
    
def pipeline(self, transaction=False):
    pp = self.rd.pipeline(transaction=transaction)
    if transaction:
        return pp
    exe = pp.execute
        
    def execute():
        if self.sock_stack < 0:
            self.connect(*self.args, **self.kwargs)
        self.rd4pp.send(''.join(
            '*%s\r\n' % len(cmd)
            + ''.join(['$%s\r\n%s\r\n' % (len(str(w)), w) for w in cmd])
            for cmd, _ in pp.command_stack))
        pp.command_stack = []
        self.sock_stack -= 1
        return []
        
    pp.execute = execute
    return pp

Comment

Page Not Found

Try to search through the entire repo.