利用redis分布式锁来防止数据重复入库

最近发现每隔一段时间就会出现一两条重复入库的数据,经检查发现是消息队列那边同一时间发送了几笔一样的订单数据过来。而原来入库时的验重逻辑是入库前先去数据库查询一次,没有发现该条数据则入库,这样的操作并不是原子性的,会导致如果有两条一样的数据同时过来,同一时间查询数据库的话,会发现并没有这条数据,这样就会将这两条一样的数据同时入库。

因此,我们需要在入库前加一把锁,使得同时过来的同样的数据只有一条能去进行查询数据库验重这步操作。下面上代码,例子使用的是laravel框架:

首先封装出一个redis加锁于释放锁的方法:

class MyRedis
{
    /**
     * 加锁
     * @param string $key 锁名
     * @param string $requestId 唯一请求ID,根据这个表示来判断是哪个客户端加的锁,从而保证加解锁的唯一性
     * @param int $expireTime 过期时间
     * @return string 成功加锁返回请求ID否则返回false
     */
    public static function addLock(string $key, string $requestId, int $expireTime)
    {
        $res = Redis::set($key, $requestId, 'PX', $expireTime, 'NX');
        return $res ? $requestId : $res;
    }

    /**
     * 解锁
     * @param string $key 锁名
     * @param string $requestId 唯一请求ID,根据这个表示来判断是哪个客户端加的锁,从而保证加解锁的唯一性
     * @return mixed
     */
    public static function releaseLock(string $key, string $requestId)
    {
        //使用Lua脚本来实现解锁, 从而保证解锁的原子性,语句的意思的如果通过key获取的值与传递过来的参数相等,就删除这个key
        $lua = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        $res = Redis::eval($lua, 1, $key, $requestId);
        return $res;
    }
}

关于唯一请求ID,是为了防止锁被误解,下面是网络上找的一个php生成唯一请求ID的DEMO:

class RequestID
{
    /**
     * 生成唯一请求id
     * @return String
     */
    public static function generate(){

        // 使用session_create_id()方法创建前缀
        $prefix = session_create_id(date('YmdHis'));

        // 使用uniqid()方法创建唯一id
        $request_id = strtoupper(md5(uniqid($prefix, true)));

        // 格式化请求id
        return self::format($request_id);
    }

    /**
     * 格式化请求id
     * @param string $request_id 请求ID
     * @param string $format 格式
     * @return string
     */
    private static function format(string $request_id, $format='8,4,4,4,12')
    {
        $tmp = array();
        $offset = 0;
        $cut = explode(',', $format);

        // 根据设定格式化
        if($cut){
            foreach($cut as $v){
                $tmp[] = substr($request_id, $offset, $v);
                $offset += $v;
            }
        }

        // 加入剩余部分
        if($offset<strlen($request_id)){
            $tmp[] = substr($request_id, $offset);
        }

        return implode('-', $tmp);
    }
}

最后就是示例代码了:

/**
 * 利用redis分布锁防止数据重复入库
 * @param Request $request
 */
public function testLock(Request $request)
{
    $requestID = $request->input('requestID');
    $data = [
        'dm_id' => $request->input('dm_id'),
        'pay_time' => $request->input('pay_time'),
        'money' => $request->input('money')
    ];
    $key = md5(json_encode($data)); //格式化数据作为key
    if (MyRedis::addLock($key, $requestID, 60)) { //如果加锁成功,则进行数据入库
        $res = DB::connection('test')->table('order')->where($data)->get(); //入库之前先进行判重
        if($res->isEmpty()) {
            DB::connection('test')->table('order')->insert($data); //如果没有该数据,则入库
            //此处为简化不在进行数据入库失败判断
            MyRedis::releaseLock($key, $requestID); //数据入库之后解锁
        }
    }
}

利用多进程+guzzle进行多并发请求测试,在3000并发下,也并未出现数据重复入库的情况


书山有路勤为径 学海无涯苦作舟