● 公司核心系统内存泄露排查(后续)

前言

晚上登录了下服务器,发现api还是存在内存上涨的情况。查看了api代码,确认是csredis的publish出了问题。把csredis源码拖了下来,看下到底是哪里出的问题。

源码排查

下载地址: https://github.com/2881099/csredis/archive/refs/heads/master.zip

  1. CSRedisCore/RedisHelper.cs
/// <summary>
/// 用于将信息发送到指定分区节点的频道,最终消息发布格式:1|message
/// </summary>
/// <param name="channel">频道名</param>
/// <param name="message">消息文本</param>
/// <returns></returns>
public static long Publish(string channel, string message) => Instance.Publish(channel, message);
  1. 跳转到CSRedisCore/CSRedisClient.csInstance.Publish方法:
/// <summary>
/// 用于将信息发送到指定分区节点的频道,最终消息发布格式:1|message
/// </summary>
/// <param name="channel">频道名</param>
/// <param name="message">消息文本</param>
/// <returns></returns>
public long Publish(string channel, string message)
{
    var msgid = HIncrBy("csredisclient:Publish:msgid", channel, 1);
    return ExecuteScalar(channel, (c, k) => c.Value.Publish(channel, $"{msgid}|{message}"));
}
  1. 跳转到CSRedisCore/RedisClient.Sync.csc.Value.Publish方法:
/// <summary>
/// Post a message to a channel
/// </summary>
/// <param name="channel">Channel to post message</param>
/// <param name="message">Message to send</param>
/// <returns>Number of clients that received the message</returns>
public long Publish(string channel, string message)
{
    return Write(RedisCommands.Publish(channel, message));
}
  1. (1). 跳转到CSRedisCore/Internal/RedisCommand.csRedisCommands.Publish方法:
public static RedisInt Publish(string channel, string message)
{
    return new RedisInt("PUBLISH", channel, message);
}

这里没有问题。

(2). 跳转到CSRedisCore/RedisClient.Sync.csWrite泛型方法:

internal T Write<T>(RedisCommand<T> command)
{
   if (_transaction.Active)
    //可疑点
    return _transaction.Write(command);
    else if (_monitor.Listening)
        return default(T);
    else if (_streaming)
    {
        //可疑点
        _connector.Write(command);
        return default(T);
    }
    else
        //可疑点
        return _connector.Call(command);
}
  1. (1). 跳转到CSRedisCore/Internal/RedisTransaction.cs_transaction.Write泛型方法:
public T Write<T>(RedisCommand<T> command)
{
    string response = _connector.Call(RedisCommands.AsTransaction(command));
    OnTransactionQueued(command, response);

    _execCommand.AddParser(x => command.Parse(x));
    return default(T);
}

(2). 跳转到CSRedisCore/Internal/RedisConnector.cs_connector.Call泛型方法:

public T Call<T>(RedisCommand<T> command)
{
    ConnectIfNotConnected();

    try
    {
        if (IsPipelined)
            //可疑点
            return _io.Pipeline.Write(command);
        
        //可疑点
        _io.Write(_io.Writer.Prepare(command));
        //可疑点
        return command.Parse(_io.Reader);
    }
    catch (IOException)
    {
        if (ReconnectAttempts == 0)
            throw;
        Reconnect();
        return Call(command);
    }
    catch (RedisException ex)
    {
        throw new RedisException($"{ex.Message}\r\nCommand: {command}", ex);
    }
}

(3). 跳转到CSRedisCore/Internal/RedisConnector.cs_connector.Write方法:

public void Write(RedisCommand command)
{
    ConnectIfNotConnected();

    try
    {
        //可疑点
        _io.Write(_io.Writer.Prepare(command));
    }
    catch (IOException)
    {
        if (ReconnectAttempts == 0)
            throw;
        Reconnect();
        Write(command);
    }
}

步骤(2)、(3)都包含_io.Write(_io.Writer.Prepare(command));

  1. (1). 跳转到CSRedisCore/Internal/IO/RedisWriter.cs_io.Writer.Prepare方法:
public byte[] Prepare(RedisCommand command) {
    var parts = command.Command.Split(' ');
    int length = parts.Length + command.Arguments.Length;
    StringBuilder sb = new StringBuilder();
    sb.Append(MultiBulk).Append(length).Append(EOL);

    foreach(var part in parts) sb.Append(Bulk).Append(_io.Encoding.GetByteCount(part)).Append(EOL).Append(part).Append(EOL);

    MemoryStream ms = new MemoryStream();
    var data = _io.Encoding.GetBytes(sb.ToString());
    ms.Write(data, 0, data.Length);

    foreach(var arg in command.Arguments) {
        if (arg != null && arg.GetType() == typeof(byte[])) {
            data = arg as byte[];
            var data2 = _io.Encoding.GetBytes($ "{Bulk}{data.Length}{EOL}");
            ms.Write(data2, 0, data2.Length);
            ms.Write(data, 0, data.Length);
            ms.Write(new byte[] {
                13,
                10
            },
            0, 2);
        } else {
            string str = String.Format(CultureInfo.InvariantCulture, "{0}", arg);
            data = _io.Encoding.GetBytes($ "{Bulk}{_io.Encoding.GetByteCount(str)}{EOL}{str}{EOL}");
            ms.Write(data, 0, data.Length);
        }
    }

    return ms.ToArray();
}

(2). 跳转到CSRedisCore/Internal/IO/RedisIO.cs_io.Write方法:

public void Write(byte[] data)
{
    lock (_streamLock)
    {
        Stream.Write(data, 0, data.Length);
        Stream.Flush();
    }
}

会不会是这里的锁有问题呢?

-- 未完 --

if (IsPipelined)
    return _io.Pipeline.Write(command);
return command.Parse(_io.Reader);

5.(3)上述源码还没看,明天早起做核酸,先到这里吧。