后端性能优化
MySQL
主要针对索引,对于MySQL优化的博文已经很多了,可以参阅,这里主要讲关于索引的几个点。
- 索引:MySQL的所有C端查询我们要尽量保证必须用到索引,尤其是那种数据量增长快的表。所以不要在索引字段进行函数运算,不要使用取反等等。
- 联合索引:联合索引使用可能是最多的了,sql没写对可能就导致了全表扫描,记住最左匹配原则,将区分度高的字段放在前面。
- 分页查询:C端经常展示一些列表,比如文章列表,作品列表等,基本都用到了分页查询,这里需要注意的是order by fieldName。要保证fieldName在索引中,不然就可能会出现filesort了,这是个外排序,在数据量大的时候是很致命的。在我们这个项目中,我没说的时候,所有的分页查询都有这个问题。
- 养成个好习惯:创建表的时候就设计好索引,不要到最后出事了再去加,那会数据量很大再加就迟了。
分库分表
一般预估年增量记录在500W以内都不需要分,我们之前是1000W以内就不分。如果只是临时做活动使用的就更不需要分了,除非活动期间量真的达到了几亿。简单介绍下分的原则
- 数据量大就分表,否则单表的查询性能肯定是下降很多
- 并发高就分库,因为单库的资源就那么多,支持的连接数不会太高。
- 大多数情况下分库分表是一起的,因为量大和并发高经常是同时出现。
- 一开始分就给足资源,二次分表成本太大,在快手一般上来就是10个库和1000张表。
- 需要特别注意的是shardKey,围绕用户展开的就用userId来做shardKey,这在绝大多数情况下没问题的。
- 如何分,见仁见智,可以使用shardingjdbc,优点很明显是减少开发成本。也可以自己手动分,优点也很明显,便于定制化和制定规则。如果你犹豫不决就shardingjdbc吧,尤其是小团队的时候。
注意事项
1、减少操作数据库次数
比如我要批量更新用户的奖励,有人会写这样的代码
for (UserReward r : rewardList) {
rewardDao.update(r);
}
如果rewardList
的size=10,那会操作数据库10次,也就是10次IO,而明显我们可以一次批量更新解决:
rewardDao.batchUpdate(rewardList);
虽然看着是那么理所当然,我们的项目甚至不少地方有下面类似的代码
User user = userService.getById(userId);
// doSomething
userService.updateById(user);
// doSomething2
userService.updateById(user);
// doSomething3
userService.updateById(user);
明明可以一次update就解决的为什么要update多次?
引申一下,有的时候甚至都不需要操作数据库,这种情况很多时候不容易发现。举个例子,之前活动的时候在活动页面会展示任务信息和用户获得的奖励。
// 查询用户任务
UserTask userTask = userTaskService.getById(id, userId);
// 构建外显响应
UserTaskResp userTaskVo = buildVo(userTask);
// 从结算表查询结算信息,从而得知获得的奖励
UserTaskSettlement s = userTaskSettlementService.getByTaskId(userTaskId, userId);
// 填充奖励信息
fillReward(userTaskVo, s);
活动页的流量是很大的,平白无故增大了结算表的流量。因为大部分用户是完不成任务的,所以根本没有获得奖励,所以可以判断用户是否完成了任务,只有完成了任务才去查询结算表即可。
使用迭代器查询
如果我们要查询的数据量很多,比如瓜分活动需要查出来所有参与并完成活动的用户,假设就一张表,有2000W数据,满足要求的用户可能有百万。如何做?
直接一个批量查询的sql?肯定是不行的
- 会出现包溢出异常
- 一次查询大量数据到内存,会占用大量内存甚至full gc。
- 正确的做法是分批查询,比如每次查询200条,然后进行处理。当然,不介意写while这种循环,一来不优雅,二来容易写出死循环的代码,最后也没法复用。可以参考下面的迭代器写法
import com.google.common.collect.AbstractIterator;
import org.apache.commons.collections.CollectionUtils;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* @Description 大批量数据迭代器 主要是针对表中数据过多循环查询问题
* List<User> list = Lists.newArrayList(new User(1,"1"), new User(2,"1"), new User(3,"1"),
* new User(4,"1"),new User(5,"1"),new User(6,"1"),new User(7,"1"));
* SimpleSingeFieldIterable<Integer, User> iterable = new SimpleSingeFieldIterable<>(0, 2,
* (position, count) -> list.stream().sorted(Comparator.comparing(User::getId))
* .filter(x -> x.getId() > position).limit(count).collect(Collectors.toList()), User::getId);
* for (List<User> list1 : iterable) {
* System.out.println(ObjectJsonMapper.toJSON(list1));
* }
*/
public class SimpleSingeFieldIterable<I extends Comparable<I>, R> implements Iterable<List<R>> {
private final BiFunction<I, Integer, List<R>> searchDAO;
private I searchPosition;
private final Integer count;
private final Function<R, I> model2IdFunction;
public SimpleSingeFieldIterable(I searchPosition, Integer count,
BiFunction<I, Integer, List<R>> searchDAO, Function<R, I> model2IdFunction) {
this.searchPosition = searchPosition;
this.count = count;
this.searchDAO = searchDAO;
this.model2IdFunction = model2IdFunction;
}
@Override
public Iterator<List<R>> iterator() {
return new SingeFieldIterator();
}
class SingeFieldIterator extends AbstractIterator<List<R>> {
private boolean needContinue = true;
@Override
protected List<R> computeNext() {
if (!needContinue) {
return endOfData();
}
List<R> result = searchDAO.apply(searchPosition, count);
if (CollectionUtils.isEmpty(result)) {
needContinue = false;
return endOfData();
}
if (result.size() < count) {
needContinue = false;
}
searchPosition = model2IdFunction.apply(result.get(result.size() - 1));
return CollectionUtils.isNotEmpty(result) ? result : endOfData();
}
}
}
使用
// 每次查询出来200条DocLibrary
SimpleSingeFieldIterable<Long, DocLibrary> docIterable =
new SimpleSingeFieldIterable<>(0L, 200,
(minId, count) -> baseMapper.getAllList(minId, count), DocLibrary::getId);
AtomicInteger counter = new AtomicInteger(0);
docIterable.forEach(list -> {
LOGGER.info("start batchUpdate! count : {}", counter.get());
List<DocLibrary> effectiveList = list.stream()
.filter(x -> StringUtils.isEmpty(x.getFileType())).collect(Collectors.toList());
for (DocLibrary doc : effectiveList) {
String fileName = doc.getFileName();
String suffix = StringUtils.substringAfterLast(fileName, ".");
doc.setFileType(suffix);
}
baseMapper.batchUpdate(list);
LOGGER.info("start batchUpdate! count : {}", counter.addAndGet(list.size()));
});
@Mapper
public interface DocLibraryMapper extends BaseMapper<DocLibrary> {
void batchUpdate(List<DocLibrary> list);
@Select("select * from doc_library where id > #{minId} order by id asc limit #{size}")
List<DocLibrary> getAllList(@Param("minId") long minId, @Param("size") int size);
}
做好SQL监控
对于大型C端项目,数据量可能很大,一定要做好SQL监控,也就是sql调用量,qps,耗时等。毕竟人是不可靠的,即使你能保证你没问题,但你保证不了别人。
缓存
关于缓存Redis,这里只做一些简单罗列。
1、使用Redis加快访问:大厂里面肯定没问题,小厂里面很多项目几乎看不使用redis,全是直接搂数据库。注意下缓存一致性即可,大部分场景使用Cache Aside策略即可。
2、Redis大key问题:大的String,比如1M。集合数据太多,比如5000个。这都是大key问题,redis处理仍然是单线程,大key会拖慢整个redis,并且影响带宽。常用解决方案就是拆分成多个小key,放到多个节点即可。
3、redis热key问题:如果某个redis的key访问量很高,那么这个key就是热key,比如我们可以认为该key的qps达到5000(看业务)那么就是热key,解决方案:
- 使用本地缓存:可以是guava的cache也可以是快手开源的全局本地缓存。
- 使用memcached:memcached扛热点能力可比redis好多了。
- 冗余写,随机读:也就是写的时候写多个副本到不同的redis节点,读的时候选其中一个节点读。
4、关于缓存击穿,一般发生在缓存过期后大量请求落到数据库,这个缓存不一定是说redis,也可以是本地缓存
- 如果是本地缓存,一般就是guava的cache,使用
load miss
方法即可,也就是cache.get(key, Callable),内部是会加锁的。 - 如果是redis,那么最容易想到的就是分布式锁,当然,也可以写个cacheSetter服务,思路是保证相同的key落到同一台服务器,然后使用jvm级别的锁去加锁处理即可;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
/**
* @Description 分布式加载缓存的rpc服务,如果部署了多台机器那么调用端最好使用id做一致性hash保证相同id的请求打到同一台机器。
**/
public abstract class AbstractCacheSetterService implements CacheSetterService {
private final ConcurrentMap<String, CountDownLatch> loadCache = new ConcurrentHashMap<>();
private final Object lock = new Object();
@Override
public void load(Collection<String> needLoadIds) {
if (CollectionUtils.isEmpty(needLoadIds)) {
return;
}
CountDownLatch latch;
Collection<CountDownLatch> loadingLatchList;
synchronized (lock) {
loadingLatchList = excludeLoadingIds(needLoadIds);
needLoadIds = Collections.unmodifiableCollection(needLoadIds);
latch = saveLatch(needLoadIds);
}
System.out.println("needLoadIds:" + needLoadIds);
try {
if (CollectionUtils.isNotEmpty(needLoadIds)) {
loadCache(needLoadIds);
}
} finally {
release(needLoadIds, latch);
block(loadingLatchList);
}
}
/**
* 加锁
* @param loadingLatchList 需要加锁的id对应的CountDownLatch
*/
protected void block(Collection<CountDownLatch> loadingLatchList) {
if (CollectionUtils.isEmpty(loadingLatchList)) {
return;
}
System.out.println("block:" + loadingLatchList);
loadingLatchList.forEach(l -> {
try {
l.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
/**
* 释放锁
* @param needLoadIds 需要释放锁的id集合
* @param latch 通过该CountDownLatch来释放锁
*/
private void release(Collection<String> needLoadIds, CountDownLatch latch) {
if (CollectionUtils.isEmpty(needLoadIds)) {
return;
}
synchronized (lock) {
needLoadIds.forEach(id -> loadCache.remove(id));
}
if (latch != null) {
latch.countDown();
}
}
/**
* 加载缓存,比如根据id从db查询数据,然后设置到redis中
* @param needLoadIds 加载缓存的id集合
*/
protected abstract void loadCache(Collection<String> needLoadIds);
/**
* 对需要加载缓存的id绑定CountDownLatch,后续相同的id请求来了从map中找到CountDownLatch,并且await,直到该线程加载完了缓存
* @param needLoadIds 能够正在去加载缓存的id集合
* @return 公用的CountDownLatch
*/
protected CountDownLatch saveLatch(Collection<String> needLoadIds) {
if (CollectionUtils.isEmpty(needLoadIds)) {
return null;
}
CountDownLatch latch = new CountDownLatch(1);
needLoadIds.forEach(loadId -> loadCache.put(loadId, latch));
System.out.println("loadCache:" + loadCache);
return latch;
}
/**
* 哪些id正在加载数据,此时持有相同id的线程需要等待
* @param ids 需要加载缓存的id集合
* @return 正在加载的id所对应的CountDownLatch集合
*/
private Collection<CountDownLatch> excludeLoadingIds(Collection<String> ids) {
List<CountDownLatch> loadingLatchList = Lists.newArrayList();
Iterator<String> iterator = ids.iterator();
while (iterator.hasNext()) {
String id = iterator.next();
CountDownLatch latch = loadCache.get(id);
if (latch != null) {
loadingLatchList.add(latch);
iterator.remove();
}
}
System.out.println("loadingLatchList:" + loadingLatchList);
return loadingLatchList;
}
}
- 如果能接受一定的脏数据,那么甚至可以设置缓存永不过期,但是要设置个逻辑过期字段,如果过期了异步加载即可。
String get(String key) {
V v = redis.get(key);
if (v.getLogicTime() < System.currentTimeMills()) {
String mutexkey = buildMutexKey(key);
if (redis.set(mutexKey, "1", "ex", 100, "nx")) {
executor.execute(() -> {//重建缓存})
}
}
return v.getValue();
}
- 集群隔离,为了保证业务直接不互相影响,最好做下集群隔离,比如任务系统使用task集群,奖励系统使用reward集群。
- 多级缓存:如果并发超高,可以考虑使用多级缓存,比如对于活动系统,在后台创建活动然后下发给用户,活动相关的基础信息是不变的,完全可以使用本地缓存+redis+mysql,设置可以加一层全局本地缓存。
- 内存布隆过滤器
实战案例
布隆过滤可用来高效判断一个元素是否存在于一个集合(小概率存在假阳性,需要二次判断),因此可以用来作为流量漏斗。
实际场景例如某个功能只对少部分用户开放,功能入口的查询流量很高,需要高性能的判断用户是否拥有该功能权限,但权限查询接口扛不住这么高的流量,此时使用布隆过滤则可以过滤掉所有无权限用户的请求,仅处理少部分可能有权限的用户查询即可。
由于源数据查询服务性能有限,增加了一层对外查询服务,在该服务启动时增加钩子,需要从源数据查询服务拉取到全量的源数据构建内存中的BloomFilter,构建完成后再开始对外提供服务。若源数据会更新,则运行期间需要定时reload源数据以重新构建BloomFilter。
对外查询服务通过BloomFilter来拦截掉大部分明确不存在于源数据集中的查询请求,从而使得源数据查询服务得到保护。
消息队列与流量聚合
MQ
MQ(消息队列)主要是来做异步削峰的,比如直播间点赞,收礼等,如果服务端收到请求后直接同步操作数据库那么晚高峰的时候对数据库就是灾难的,所以必须使用消息队列来处理。
场景介绍
这里就举两个常见的例子
- 电商秒杀:每次用户秒杀的结果虽然在页面展示了,实际上一般是把请求丢到了消息队列了,后续慢慢消费处理(db中库存的改动等);
- 直播间收礼:在一些大V开直播的时候,直播收礼的流量10W/s很正常,服务端需要对礼物进行归类和统计等处理,这显然也可以使用mq去异步处理;
注意事项
我们需要关注的是消费速度和性能之间的平衡,如果消费过快,那么下游扛不住,如果消费过慢又会导致消息堆积,消费到冷数据并影响业务。
解决思路
拿RocketMQ来举例,消费者从broker拉取的消息是极快的,一般这里不会成为性能瓶颈,往往成为性能瓶颈的是业务的IO操作,所以发现消息堆积了的话,先从IO那块想办法解决。比如是不是流量增大了导致消费跟不上,这时候可以考虑在不影响性能的情况下调整消费线程数或者消费者扩容。或者是不是一些IO接口出问题了。
流量聚合
流量聚合简单来说就是把多次的请求整合为一个请求处理,显然只有在业务对单次的请求不敏感时并且能接受一定延迟时才能使用,比如直播间点赞,主播对单次点赞根本不敏感,直播间展示的赞数也不是实时的,完全可以对多次点赞进行聚合,最后再进行一些列判断再累计起来存放到db或缓存中,直播间从db或缓存中进行拉取。
比如我要统计每天消耗大模型的token数,当每次有相关接口调用的时候就将消耗的token数发到消息队列,然后在消费者侧进行流量聚合。
再比如,我们使用binlog监听工具用户任务表(散了1000张表),每次产生用户任务信息我们就往ES写(超高流量),然后在后台我们去ES搜索,所以我们也可以每次监听到消息后进行聚合,最后批量写到ES。
那么这个聚合的工具需要什么功能呢?简单来说主要就三点
- 提供个能存放数据的容器
- 当达到指定数量时输出容器中的数据
- 当达到指定时候时输出容器中的数据
刚好快手提供了这么个工具-BufferTrigger,这个工具在快手内部大量使用,尤其是主站直播,下面将简单介绍下这个简单易用的工具。
public class BufferTrigger<E> {
private static final Logger logger = getLogger(BufferTrigger.class);
private final BlockingQueue<E> queue;
private final int batchSize;
private final long lingerMs;
private final ThrowableConsumer<List<E>, Exception> consumer;
private final ScheduledExecutorService scheduledExecutorService;
private final ReentrantLock lock = new ReentrantLock();
private final AtomicBoolean running = new AtomicBoolean();
private BufferTrigger(long lingerMs, int batchSize, int bufferSize,
ThrowableConsumer<List<E>, Exception> consumer) {
this.lingerMs = lingerMs;
this.batchSize = batchSize;
this.queue = new LinkedBlockingQueue<>(max(bufferSize, batchSize));
this.consumer = consumer;
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// 执行一次调度
this.scheduledExecutorService.schedule(new BatchConsumerRunnable(), this.lingerMs,
MILLISECONDS);
}
public void enqueue(E element) {
try {
queue.put(element);
tryTrigBatchConsume();
} catch (InterruptedException e) {
currentThread().interrupt();
}
}
private void tryTrigBatchConsume() {
if (queue.size() >= batchSize) {
if (lock.tryLock()) {
try {
if (queue.size() >= batchSize) {
if (!running.get()) { // prevent repeat enqueue
this.scheduledExecutorService.execute(this::doBatchConsumer);
running.set(true);
}
}
} finally {
lock.unlock();
}
}
}
}
public void manuallyDoTrigger() {
doBatchConsumer();
}
private void doBatchConsumer() {
lock.lock();
try {
running.set(true);
while (!queue.isEmpty()) {
List<E> toConsumeData = new ArrayList<>(min(batchSize, queue.size()));
queue.drainTo(toConsumeData, batchSize);
if (!toConsumeData.isEmpty()) {
doConsume(toConsumeData);
}
}
} finally {
running.set(false);
lock.unlock();
}
}
private void doConsume(List<E> toConsumeData) {
try {
consumer.accept(toConsumeData);
} catch (Throwable e) {
logger.error("doConsume failed", e);
}
}
private class BatchConsumerRunnable implements Runnable {
@Override
public void run() {
try {
doBatchConsumer();
} finally {
scheduledExecutorService.schedule(this, lingerMs, MILLISECONDS);
}
}
}
public long getPendingChanges() {
return queue.size();
}
public static <E> BufferTriggerBuilder<E> newBuilder() {
return new BufferTriggerBuilder<>();
}
public static class BufferTriggerBuilder<E> {
private int batchSize;
private int bufferSize;
private Duration duration;
private ThrowableConsumer<List<E>, Exception> consumer;
public BufferTriggerBuilder<E> batchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
public BufferTriggerBuilder<E> bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public BufferTriggerBuilder<E> duration(Duration duration) {
this.duration = duration;
return this;
}
public BufferTriggerBuilder<E> consumer(ThrowableConsumer<List<E>, Exception> consumer) {
this.consumer = consumer;
return this;
}
public BufferTrigger<E> build() {
Preconditions.checkArgument(batchSize > 0, "batchSize 必须大于0");
Preconditions.checkArgument(bufferSize > 0, "bufferSize 必须大于0");
Preconditions.checkNotNull(duration, "duration未设置");
Preconditions.checkNotNull(consumer, "消费函数未设置");
return new BufferTrigger<>(duration.toMillis(), batchSize, bufferSize, consumer);
}
}
}
测试
public class BufferTriggerDemo {
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
BufferTrigger<Long> trigger = BufferTrigger.<Long>newBuilder()
.batchSize(10)
.bufferSize(100)
.duration(Duration.ofSeconds(5))
.consumer(BufferTriggerDemo::consume)
.build();
for (long i = 0; i < 10000; i ++) {
trigger.enqueue(i);
TimeUnit.MILLISECONDS.sleep(100);
}
}
private static void consume(List<Long> list) {
System.out.printf("次数 %d, result : %s \n", counter.incrementAndGet(), list);
}
}
full gc陷阱
需要注意的是BufferTrigger是单线程消费的,这个是一个比较大的陷阱,做活动的时候是踩了坑的,尤其是在消费操作中涉及到大量io操作的场景,因为在流量很高的时候可能会出现消费速度跟不上生产速度,这很容易导致full gc问题。所以如果有必要的话需要使用线程池来提升消费速度。
消息队列和BufferTrigger的组合方式
有没有考虑过消息队列的消费速度过慢,如何在不影响下游性能的情况下提升消费速?比如直播点赞,在mq每次收到一条点赞消息的时候是不是就可以使用BufferTrigger来进行聚合?然后每分钟消费一次,在流量剧增的时候是不是能十倍以上的提速?
线程池
线程池作为jdk的一个重要组件,同时也是性能优化的常客,不得不谈。
1、如果异步,尤其现在大模型的场景,基本上是必不可少。比如你要请求的两个接口没有关联性,可以考虑使用线程池去并发请求。
2、对于非核心逻辑我们也可以使用线程池处理,比如用户完成任务后要给他发个触达(短信,私信等),可以使用线程池去异步处理。
3、当然,存在明显的问题就是服务重启就没了,所以对性能要求高的服务还是得使用MQ。
4、服务端大多是IO型操作,所以能够将线程数调大一点,起始值我们可以设置为核数*2。要设置一个合理的值,我们可以加上监控,尤其要监控活跃线程数和堆积数,从而来调整一个合适的值,并且帮助快速发现问题。
之前出现一个问题可以作为借鉴,线上大模型相关接口经常卡住,查了半天猜测是线程池里面的任务处理耗时太长,甚至卡住了,导致新的请求进了队列并一直得不到消费,为了验证这个问题,加上了线程池监控,发现确实和猜测的吻合。
1、如果你使用rpc,比如thirft,那么一定要使用自定义的线程池,并且监控,之前在小爱的时候就出现rpc接口在早高峰耗时上涨,究其原因是因为流量上涨,导致线程池处理不过来,跟上面说的问题很类似。
2、此外建议给线程池命名,这样在发生问题使用jstack来dump线程堆栈的时候好分析问题。
[root@turbodesk-api-canary-c9b685f9-ctclm home]# jstack 1 > thread.log
[root@turbodesk-api-canary-c9b685f9-ctclm home]# ll
total 752
-rw-r--r-- 1 root root 769667 Jan 17 17:47 thread.log
[root@turbodesk-api-canary-c9b685f9-ctclm home]# grep java.lang.Thread.State thread.log | awk '{print $2$3$4$5}' | sort | uniq -c
69 RUNNABLE
3 TIMED_WAITING(onobjectmonitor)
8 TIMED_WAITING(parking)
6 TIMED_WAITING(sleeping)
2 WAITING(onobjectmonitor)
706 WAITING(parking)
可以使用[Smart Java thread dump analyzer - thread dump analysis in seconds](https://fastthread.io/)
来帮忙分析
如果你的接口是间歇性的有大量请求,可以将核心线程数调小一些,避免白创建太多线程处于waitting状态。系统能创建的线程数是有限的,如何计算我就不谈了,比如我们的系统最多能创建1W个线程,之前有个同事的接口,在每次请求的时候创建了只有一个线程的线程池,导致服务经常重启。
其他技巧
请求丢弃
在很多业务场景下,丢弃部分请求完全不影响业务,给个合理的提示的话用户根本无法感知,这其中最常见的就是秒杀、抢红包、发弹幕这类业务,请求量很大,但只有少部分请求能拿到钱,大部分请求直接丢弃都是没问题的,到时候告诉用户没抢到或者抢没了或者弹幕发生成功即可。
预处理
这类场景在抢红包的时候经常用到,可能很多人都知道很多app抢红包其实并不是在用户每次去抢的时候再去计算金额并落库的,一般是在红包发出去的时候就计算好了,比如1W块钱,100个人,那么就会随机生成100个数额推入到缓存的队列,用户抢的时候从队列直接pop即可。
当然,可以通过消息队列异步落库,也就是同步写缓存,异步写db。
又或者是在用户完成活动的时候得到一个资格,然后在指定的时间点去瓜分1W块钱,其实也是在这时间点之前就把能用的金额和获得资格的人拿出来,计算出每个人的金额再存储到缓存和db,最后用户虽然说是去抢,其实就是从缓存中把已经给他存好的钱拿出来了而已。