实际开发过程中线程安全问题发现与解决

问题的发现

笔者在开发一个设备信号强度 历史记录模块的一个功能
很简单 没啥问题
数据库表结构 大致为 id 、deviceId 、rssi 、time

image

发现 相同数据 插入了多条,天呐,这数据不就沉余了吗image

无脑先问问K哥如何处理

追溯源头发现 数据来源就是多条 相同的数据,因此插入数据库多条没毛病啊,但是我需要避免这种情况的发生,于是 第一时间想到的就是增加缓存,用缓存来限制数据不重复插入。 将deviceId为 Key 设置他的过期时间,在过期时间内如果遇到相同的deviceId 就不再重复插入。

然而 这简单的操作在 线程池中是无效的,image

废话不多说先上代码 示范代码

private int corePoolSize = 5;

    private int maxPoolSize = 20;

    private int queueCapacity = 128;

    private ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(queueCapacity), new CustomizableThreadFactory("test"),
            new ThreadPoolExecutor.AbortPolicy());
    @Autowired
    private IAddressRepository addressRepository;
    //百度的一个map过期时间类 模拟我生产使用的 ehcache
    private ExpiryMap<String, String> filter = new ExpiryMap<>();
    @RequestMapping("test")
    @ResponseBody
    public Object ceshi(){
        //模拟出现一百条相同的信息
        for (int i = 0; i < 100; i++) {
            test();
        }
        return "ojbk";
    }
    public void test(){
        Address address = new Address();
        address.setCity("上海市");
        address.setAddress("和平路658弄");
        address.setProvince("上海");
        address.setDistrict("浦东新区");
        threadPool.submit(new Runnable() {
            @Override
            public void run() {
                if(filter.get("上海市") == null){
                    addressRepository.save(address);
                    filter.put("上海市","1");
                }


            }
        });
    }


成了, 浏览器访问一次 /test 是的 就一次。
image

让我们看看数据库增加了多少条记录
image

你没看错 是 5条记录~

正常情况 插入一次 就会设置 Key ,下次插入前会先取出key 如果 Key 已经存在了 就应该会跳过后面的插入逻辑, 但是这是在多线程中存在线程安全问题,get的时候是线程不安全的,因此我们需要解决线程不安全问题。

问题解决

从 mybatis 的源码中搬砖 org.apache.ibatis.cache.decorators.BlockingCache

 private long timeout = 5000L;
    private void acquireLock(Object key) {
        ReentrantLock lock = this.getLockForKey(key);
        if (this.timeout > 0L) {
            try {
                boolean e = lock.tryLock(this.timeout, TimeUnit.MILLISECONDS);
                if (!e) {
                    System.err.println("Couldn\'t get a lock in " + this.timeout + " for the key " + key + " at the cache ");
                }
            } catch (InterruptedException arg3) {
                System.err.println(arg3);
            }
        } else {
            lock.lock();
        }
    }
    
    public Object getCacheObject(String key, ExpiryMap<String, String> filter) {
        this.acquireLock(key);
        Object value = filter.get(key);
        if (value != null) {
            this.releaseLock(key);
        }

        return value;
    }
    private final ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<>();

    private ReentrantLock getLockForKey(Object key) {
        ReentrantLock lock = new ReentrantLock();
        ReentrantLock previous = (ReentrantLock) this.locks.putIfAbsent(key, lock);
        return previous == null ? lock : previous;
    }

    private void releaseLock(Object key) {
        ReentrantLock lock = (ReentrantLock) this.locks.get(key);
        if (lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
    }

然后将原来的代码 改造一下,

threadPool.submit(new Runnable() {
            @Override
            public void run() {
                if(getCacheObject("上海市", filter) == null){
                    addressRepository.save(address);
                    filter.put("上海市","1");
                }


            }
        });

好了 我们再次访问/test 无论访问多少次
我们去查阅数据库可以发现 仅增加一条数据
image

控制台也能 看到出 由于没有获取锁而抛出的异常

1 个赞

你看这样行不行。
你在这个表新增一个列 hash varchar(255)。 设置唯一索引。
你在 insert 前,先计算出这条数据的hash值。先根据 hash 判断记录是否存在

SELECT 1 FROM `table` WHERE `hash` = ?;

如果结果不存在,才插入。

或者利用Set实现一个队列(Redis的zset, 或者内存中的LinkedHashSet)。用线程池消费写入数据库。

1 个赞