Skip to content

Fix: Resolve Memory Leak in Parser Buffer Management #3531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

wpietrucha
Copy link

@wpietrucha wpietrucha commented Aug 20, 2025

Fix: Resolve Memory Leak in Parser Buffer Management

Problem

The Parser class in pg-protocol had a significant memory leak caused by improper buffer management. Large buffers would remain in memory indefinitely, even after their data was processed, leading to steadily increasing memory usage in long-running applications.

Here is what we observed in our application:

compressed-memory-leak-small.mov

Root Cause

When the parser processes messages and has remaining partial data, it would only adjust buffer cursors (this.bufferOffset) instead of actually shrinking the buffer. This meant that if a very large message (e.g., 1MB) arrived, the internal buffer would grow to accommodate it but never shrink back, even if only a few bytes were still needed.

// BEFORE (problematic code)
} else {
  // Adjust the cursors of remainingBuffer
  this.bufferLength = bufferFullLength - offset
  this.bufferOffset = offset  // ❌ Still holds reference to huge buffer
}

Result: The parser could hold references to very large buffers indefinitely, preventing garbage collection.

Solution

When partial messages remain, create a new appropriately-sized buffer and copy only the remaining data:

// AFTER (fixed code)
} else {
  // A partial message remains.
  // Create a new, smaller buffer and copy only the remaining data into it.
  // This breaks the reference to the original, potentially huge buffer.
  const remainingLength = bufferFullLength - offset
  const newBuffer = Buffer.allocUnsafe(remainingLength)
  this.buffer.copy(newBuffer, 0, offset, offset + remainingLength)

  this.buffer = newBuffer
  this.bufferOffset = 0
  this.bufferLength = remainingLength
}

Memory Management Flow

Before Fix:

Large buffer arrives (1MB) → Parser grows internal buffer → Processes most data → 
Keeps entire 1MB buffer for 64 bytes of remaining data → Memory leak

After Fix:

Large buffer arrives (1MB) → Parser grows internal buffer → Processes most data → 
Creates new 64-byte buffer for remaining data → Old 1MB buffer eligible for GC → Memory freed ✅

Impact

  • Eliminates memory leaks in long-running applications
  • Reduces memory pressure by allowing proper garbage collection of large buffers
  • No breaking changes - identical public API and behavior
  • Minimal performance overhead - only affects the uncommon case of partial message handling

Testing

The fix maintains full backward compatibility and all existing tests should pass. The memory management improvements will be most noticeable in applications that:

  • Handle large messages
  • Run for extended periods
  • Process high-volume data streams

Files Changed:

  • packages/pg-protocol/src/parser.ts - Fixed buffer shrinking logic to prevent memory leaks

@wpietrucha
Copy link
Author

@regevbr , I noticed that you are author of the initial code here. Could you take a look on this change?
I noticed that you've did significant performance improvements by you change here: #2241 and I want to ensure that we will not lose that :) Could you recommend me some good way to verify this PR in this context (but other ideas also welcome:))?

@regevbr
Copy link
Contributor

regevbr commented Aug 20, 2025

I think your example is not covering the situation here.

Once a buffer gets increased in size, the only way for it to get smaller is in a very specific case where the parsing ends and there is no more data to be parsed in the buffer. I guess that this case doesn't happen for you since the connection is never "resting". Also, since you probably have a lot of info coming on the stream and the processing is slower/the data is very big, the buffer gets bigger and bigger.

Your solution will indeed reduce the buffer, but can potentially introduce the previous issues that were fixed in my PR as you will cause a lot of sync buffer allocations over and over again.

I can suggest 2 things:

  1. not directly related, but before merging the buffers, it might be worth trying to try and use a more complex parsing logic that iterates both buffers, and once it is done, decides if we need to merge the buffers, or maybe just use the new buffer with an offset
  2. Instead of shrinking the buffer on every parse loop end, we can try to be more sophisticated about it. I suggest that we only shrink it if it is at least half empty, and in that case, shrink it to half its size. The reason we don't shrink to the exact size is that we will for sure need to extend it in the next parse iteration, so I'd rather we have that wiggle room for the next buffer (unless we implement (1) then it becomes irrelevant).
    If the buffer is very big and suddenly becomes 95% empty, it wil be reduced by half on every parse call slowly until suddnely there will be much more data coming in.

Can you at least try approach 2 locally and see if it resolves it for you?

What are your thoughts on my approach?

In addition, can you add a bench test that recreates your scenario, and then make sure that your fix works on it?

@brianc, your input is also needed here, please

@hjr3
Copy link
Collaborator

hjr3 commented Aug 20, 2025

In addition, can you add a bench test that recreates your scenario, and then make sure that your fix works on it?

I came here to say this as well. Even a small reproducible script would be helpful to understand the behavior you are seeing.

Also, some prior art for benchmarking: https://github.com/hjr3/node-postgres/blob/b1609b5732515f07cac32f699fbdb3d4f63d852d/packages/pg-bench/src/binary-format.ts

@brianc
Copy link
Owner

brianc commented Aug 20, 2025

I recall intentionally doing this or something similar in the past for performance reasons. I don't consider it a memory leak to grow the internal memory usage of the parser to the largest buffer received while the parser is allocated. It wont grow infinitely, so I don't think it's a "leak." It's just a space/time trade-off: "take more space to save time having to realloc memory more often." I would like to see benchmarks/numbers around the tradeoff to see what we give up performance wise going the other way.

@wpietrucha wpietrucha force-pushed the fix/parser-memory-leak branch from 74db528 to e23b8f5 Compare August 20, 2025 12:11
Fix buffer shrinking: create new appropriately-sized buffer instead of just adjusting pointers when partial messages remain.

This allows garbage collection of large processed buffers that were previously kept alive due to cursor-based buffer management.

Resolves memory leak where large buffers would remain in memory indefinitely even when only a few bytes of data were still needed.
@wpietrucha wpietrucha force-pushed the fix/parser-memory-leak branch from e23b8f5 to 34b5c51 Compare August 20, 2025 12:12
Implement sophisticated buffer management that only shrinks when buffer is more than half empty, and reduces to half size (not exact size) to provide wiggle room for incoming data.

This prevents both memory leaks from oversized buffers and performance issues from excessive reallocations in high-throughput scenarios.

- Only shrink when buffer utilization < 50%
- When shrinking, reduce to max(half current size, 2x remaining data)
- Gradual reduction allows large buffers to shrink progressively
- Falls back to cursor strategy when buffer utilization is reasonable
@wpietrucha wpietrucha force-pushed the fix/parser-memory-leak branch from c161069 to 68763e8 Compare August 20, 2025 12:31
@wpietrucha
Copy link
Author

Sure, thank you for comments.
@regevbr , do you mean something like this:
68763e8 (#3531)
?

I came here to say this as well. Even a small reproducible script would be helpful to understand the behavior you are seeing.

@hjr3 , sure, will try.

@regevbr
Copy link
Contributor

regevbr commented Aug 20, 2025

I actually took the chance to have a vibe coding challenge. I used claude and it took 55 versions to get here haha.
I took approach 1 and skipped the shrinking as it is not needed anymore.
THe parsing will intially take place using a "virtual buffer" without merging the buffers and then either merge the buffers if needed, or use the new buffer, offseted properly. Thus in many cases we might not even need to make any allocations. But in cases where there is a large amount of data coming, it will behave the same as today, and once the big data is processed, it will shrink as soon as it can by utilizing the new buffer coming.
I still need to think what can happen if we only get huge ammounts of data one after the other though... I want to try another approcah that doesnt involve allocation at all. :-) If it will not succeed, I think we can make the approach bellow better by allocating enought space for the rest of the message, as we might already know how long the message will be.
I'm not sure this will not make things worst but Im pretty sure it will at least not make it that much worse, but defenitly improve your scenario.
I would appreciate everyones eye on this. Bare in mind that I didn't compile it or tested it, just used claude web. This requires more personal touches and major testings.

import { TransformOptions } from 'stream'
import {
  Mode,
  bindComplete,
  parseComplete,
  closeComplete,
  noData,
  portalSuspended,
  copyDone,
  replicationStart,
  emptyQuery,
  ReadyForQueryMessage,
  CommandCompleteMessage,
  CopyDataMessage,
  CopyResponse,
  NotificationResponseMessage,
  RowDescriptionMessage,
  ParameterDescriptionMessage,
  Field,
  DataRowMessage,
  ParameterStatusMessage,
  BackendKeyDataMessage,
  DatabaseError,
  BackendMessage,
  MessageName,
  AuthenticationMD5Password,
  NoticeMessage,
} from './messages'
import { BufferReader } from './buffer-reader'

// every message is prefixed with a single bye
const CODE_LENGTH = 1
// every message has an int32 length which includes itself but does
// NOT include the code in the length
const LEN_LENGTH = 4
const HEADER_LENGTH = CODE_LENGTH + LEN_LENGTH
const UINT32_SIZE = 4

export type Packet = {
  code: number
  packet: Buffer
}

const emptyBuffer = Buffer.allocUnsafe(0)

type StreamOptions = TransformOptions & {
  mode: Mode
}

const enum MessageCodes {
  DataRow = 0x44, // D
  ParseComplete = 0x31, // 1
  BindComplete = 0x32, // 2
  CloseComplete = 0x33, // 3
  CommandComplete = 0x43, // C
  ReadyForQuery = 0x5a, // Z
  NoData = 0x6e, // n
  NotificationResponse = 0x41, // A
  AuthenticationResponse = 0x52, // R
  ParameterStatus = 0x53, // S
  BackendKeyData = 0x4b, // K
  ErrorMessage = 0x45, // E
  NoticeMessage = 0x4e, // N
  RowDescriptionMessage = 0x54, // T
  ParameterDescriptionMessage = 0x74, // t
  PortalSuspended = 0x73, // s
  ReplicationStart = 0x57, // W
  EmptyQuery = 0x49, // I
  CopyIn = 0x47, // G
  CopyOut = 0x48, // H
  CopyDone = 0x63, // c
  CopyData = 0x64, // d
}

export type MessageCallback = (msg: BackendMessage) => void

interface ParseResultMerge {
  needsMerge: true
  virtualOffset: number
}

interface ParseResultNoMerge {
  needsMerge: false
  remainingBuffer: Buffer
  remainingOffset: number
  remainingLength: number
}

type ParseResult = ParseResultMerge | ParseResultNoMerge

interface BufferLocation {
  buffer: Buffer
  offset: number
}

/**
 * VirtualBuffer provides a unified view over two separate buffers,
 * allowing seamless reading across buffer boundaries
 */
class VirtualBuffer {
  constructor(
    private oldBuffer: Buffer,
    private oldOffset: number,
    private oldLength: number,
    private newBuffer: Buffer
  ) {}

  get totalLength(): number {
    return this.oldLength + this.newBuffer.length
  }

  /**
   * Read a single byte at a virtual offset
   */
  readByteAt(virtualOffset: number): number {
    if (virtualOffset < this.oldLength) {
      return this.oldBuffer[this.oldOffset + virtualOffset]
    } else {
      return this.newBuffer[virtualOffset - this.oldLength]
    }
  }

  /**
   * Read a UInt32BE at a virtual offset, potentially spanning both buffers
   */
  readUInt32BEAt(virtualOffset: number): number {
    // If the entire UInt32 is in the old buffer
    if (virtualOffset + UINT32_SIZE <= this.oldLength) {
      return this.oldBuffer.readUInt32BE(this.oldOffset + virtualOffset)
    }

    // If the entire UInt32 is in the new buffer
    if (virtualOffset >= this.oldLength) {
      return this.newBuffer.readUInt32BE(virtualOffset - this.oldLength)
    }

    // UInt32 spans both buffers - create a temporary buffer and use readUInt32BE
    const tempBuffer = Buffer.allocUnsafe(UINT32_SIZE)
    for (let i = 0; i < UINT32_SIZE; i++) {
      tempBuffer[i] = this.readByteAt(virtualOffset + i)
    }
    return tempBuffer.readUInt32BE(0)
  }

  /**
   * Get buffer location for data, extracting into a new buffer if it spans both buffers
   * Always returns a BufferLocation with the appropriate buffer and offset
   */
  getBufferLocation(virtualOffset: number, length: number): BufferLocation {
    // Data entirely in old buffer
    if (virtualOffset + length <= this.oldLength) {
      return {
        buffer: this.oldBuffer,
        offset: this.oldOffset + virtualOffset
      }
    }

    // Data entirely in new buffer
    if (virtualOffset >= this.oldLength) {
      return {
        buffer: this.newBuffer,
        offset: virtualOffset - this.oldLength
      }
    }

    // Data spans both buffers - extract into a new buffer
    const oldBufferPart = this.oldLength - virtualOffset
    const newBufferPart = length - oldBufferPart

    const extractedBuffer = Buffer.allocUnsafe(length)
    this.oldBuffer.copy(extractedBuffer, 0, this.oldOffset + virtualOffset, this.oldOffset + this.oldLength)
    this.newBuffer.copy(extractedBuffer, oldBufferPart, 0, newBufferPart)

    return {
      buffer: extractedBuffer,
      offset: 0
    }
  }

  /**
   * Calculate remaining data after consuming up to virtualOffset
   */
  calculateRemaining(virtualOffset: number): ParseResult {
    const remainingLength = this.totalLength - virtualOffset

    if (remainingLength === 0) {
      // Nothing left
      return {
        needsMerge: false,
        remainingBuffer: emptyBuffer,
        remainingOffset: 0,
        remainingLength: 0
      }
    }

    // If all remaining data is in the new buffer
    if (virtualOffset >= this.oldLength) {
      const newOffset = virtualOffset - this.oldLength
      return {
        needsMerge: false,
        remainingBuffer: this.newBuffer,
        remainingOffset: newOffset,
        remainingLength: remainingLength
      }
    }

    // Remaining data spans both buffers - need to merge
    return {
      needsMerge: true,
      virtualOffset
    }
  }
}

export class Parser {
  private buffer: Buffer = emptyBuffer
  private bufferLength: number = 0
  private bufferOffset: number = 0
  private reader = new BufferReader()
  private mode: Mode
  
  constructor(opts?: StreamOptions) {
    if (opts?.mode === 'binary') {
      throw new Error('Binary mode not supported yet')
    }
    this.mode = opts?.mode || 'text'
  }
  
  public parse(buffer: Buffer, callback: MessageCallback) {
    const parseResult = this.parseVirtualBuffer(buffer, callback)
    
    if (parseResult.needsMerge) {
      this.updateBufferAfterParsing(parseResult.virtualOffset)
      this.mergeBuffer(buffer)
    } else {
      this.setRemainingBuffer(parseResult)
    }
  }

  private updateBufferAfterParsing(consumedBytes: number): void {
    this.bufferOffset += consumedBytes
    this.bufferLength -= consumedBytes
  }

  private setRemainingBuffer(parseResult: ParseResultNoMerge): void {
    this.buffer = parseResult.remainingBuffer
    this.bufferOffset = parseResult.remainingOffset
    this.bufferLength = parseResult.remainingLength
  }
  
  private parseVirtualBuffer(newBuffer: Buffer, callback: MessageCallback): ParseResult {
    const virtualBuffer = new VirtualBuffer(this.buffer, this.bufferOffset, this.bufferLength, newBuffer)
    let virtualOffset = 0
    
    while (virtualOffset + HEADER_LENGTH <= virtualBuffer.totalLength) {
      const messageInfo = this.readMessageHeader(virtualBuffer, virtualOffset)
      const fullMessageLength = CODE_LENGTH + messageInfo.length
      
      if (virtualOffset + fullMessageLength <= virtualBuffer.totalLength) {
        this.processCompleteMessage(virtualBuffer, virtualOffset, messageInfo, callback)
        virtualOffset += fullMessageLength
      } else {
        break // Incomplete message, stop parsing
      }
    }
    
    return virtualBuffer.calculateRemaining(virtualOffset)
  }

  private readMessageHeader(virtualBuffer: VirtualBuffer, virtualOffset: number): { code: number; length: number } {
    const code = virtualBuffer.readByteAt(virtualOffset)
    const length = virtualBuffer.readUInt32BEAt(virtualOffset + CODE_LENGTH)
    return { code, length }
  }

  private processCompleteMessage(
    virtualBuffer: VirtualBuffer,
    virtualOffset: number,
    messageInfo: { code: number; length: number },
    callback: MessageCallback
  ): void {
    const messageStartOffset = virtualOffset + HEADER_LENGTH
    const bufferLocation = virtualBuffer.getBufferLocation(messageStartOffset, messageInfo.length)
    const message = this.handlePacket(bufferLocation.offset, messageInfo.code, messageInfo.length, bufferLocation.buffer)
    callback(message)
  }
  
  private mergeBuffer(buffer: Buffer): void {
    if (this.bufferLength > 0) {
      const newLength = this.bufferLength + buffer.byteLength
      const newFullLength = newLength + this.bufferOffset
      
      if (newFullLength > this.buffer.byteLength) {
        // We can't concat the new buffer with the remaining one
        let newBuffer: Buffer
        
        if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) {
          // We can move the relevant part to the beginning of the buffer instead of allocating a new buffer
          newBuffer = this.buffer
        } else {
          // Allocate a new larger buffer
          let newBufferLength = this.buffer.byteLength * 2
          while (newLength >= newBufferLength) {
            newBufferLength *= 2
          }
          newBuffer = Buffer.allocUnsafe(newBufferLength)
        }
        
        // Move the remaining buffer to the new one
        this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength)
        this.buffer = newBuffer
        this.bufferOffset = 0
      }
      
      // Concat the new buffer with the remaining one
      buffer.copy(this.buffer, this.bufferOffset + this.bufferLength)
      this.bufferLength = newLength
    } else {
      this.buffer = buffer
      this.bufferOffset = 0
      this.bufferLength = buffer.byteLength
    }
  }

  private handlePacket(offset: number, code: number, length: number, bytes: Buffer): BackendMessage {
    switch (code) {
      case MessageCodes.BindComplete:
        return bindComplete
      case MessageCodes.ParseComplete:
        return parseComplete
      case MessageCodes.CloseComplete:
        return closeComplete
      case MessageCodes.NoData:
        return noData
      case MessageCodes.PortalSuspended:
        return portalSuspended
      case MessageCodes.CopyDone:
        return copyDone
      case MessageCodes.ReplicationStart:
        return replicationStart
      case MessageCodes.EmptyQuery:
        return emptyQuery
      case MessageCodes.DataRow:
        return this.parseDataRowMessage(offset, length, bytes)
      case MessageCodes.CommandComplete:
        return this.parseCommandCompleteMessage(offset, length, bytes)
      case MessageCodes.ReadyForQuery:
        return this.parseReadyForQueryMessage(offset, length, bytes)
      case MessageCodes.NotificationResponse:
        return this.parseNotificationMessage(offset, length, bytes)
      case MessageCodes.AuthenticationResponse:
        return this.parseAuthenticationResponse(offset, length, bytes)
      case MessageCodes.ParameterStatus:
        return this.parseParameterStatusMessage(offset, length, bytes)
      case MessageCodes.BackendKeyData:
        return this.parseBackendKeyData(offset, length, bytes)
      case MessageCodes.ErrorMessage:
        return this.parseErrorMessage(offset, length, bytes, 'error')
      case MessageCodes.NoticeMessage:
        return this.parseErrorMessage(offset, length, bytes, 'notice')
      case MessageCodes.RowDescriptionMessage:
        return this.parseRowDescriptionMessage(offset, length, bytes)
      case MessageCodes.ParameterDescriptionMessage:
        return this.parseParameterDescriptionMessage(offset, length, bytes)
      case MessageCodes.CopyIn:
        return this.parseCopyInMessage(offset, length, bytes)
      case MessageCodes.CopyOut:
        return this.parseCopyOutMessage(offset, length, bytes)
      case MessageCodes.CopyData:
        return this.parseCopyData(offset, length, bytes)
      default:
        return new DatabaseError('received invalid response: ' + code.toString(16), length, 'error')
    }
  }
  
  private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const status = this.reader.string(1)
    return new ReadyForQueryMessage(length, status)
  }
  
  private parseCommandCompleteMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const text = this.reader.cstring()
    return new CommandCompleteMessage(length, text)
  }
  
  private parseCopyData(offset: number, length: number, bytes: Buffer) {
    const chunk = bytes.slice(offset, offset + (length - 4))
    return new CopyDataMessage(length, chunk)
  }
  
  private parseCopyInMessage(offset: number, length: number, bytes: Buffer) {
    return this.parseCopyMessage(offset, length, bytes, 'copyInResponse')
  }
  
  private parseCopyOutMessage(offset: number, length: number, bytes: Buffer) {
    return this.parseCopyMessage(offset, length, bytes, 'copyOutResponse')
  }
  
  private parseCopyMessage(offset: number, length: number, bytes: Buffer, messageName: MessageName) {
    this.reader.setBuffer(offset, bytes)
    const isBinary = this.reader.byte() !== 0
    const columnCount = this.reader.int16()
    const message = new CopyResponse(length, messageName, isBinary, columnCount)
    for (let i = 0; i < columnCount; i++) {
      message.columnTypes[i] = this.reader.int16()
    }
    return message
  }
  
  private parseNotificationMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const processId = this.reader.int32()
    const channel = this.reader.cstring()
    const payload = this.reader.cstring()
    return new NotificationResponseMessage(length, processId, channel, payload)
  }
  
  private parseRowDescriptionMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const fieldCount = this.reader.int16()
    const message = new RowDescriptionMessage(length, fieldCount)
    for (let i = 0; i < fieldCount; i++) {
      message.fields[i] = this.parseField()
    }
    return message
  }
  
  private parseField(): Field {
    const name = this.reader.cstring()
    const tableID = this.reader.uint32()
    const columnID = this.reader.int16()
    const dataTypeID = this.reader.uint32()
    const dataTypeSize = this.reader.int16()
    const dataTypeModifier = this.reader.int32()
    const mode = this.reader.int16() === 0 ? 'text' : 'binary'
    return new Field(name, tableID, columnID, dataTypeID, dataTypeSize, dataTypeModifier, mode)
  }
  
  private parseParameterDescriptionMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const parameterCount = this.reader.int16()
    const message = new ParameterDescriptionMessage(length, parameterCount)
    for (let i = 0; i < parameterCount; i++) {
      message.dataTypeIDs[i] = this.reader.int32()
    }
    return message
  }
  
  private parseDataRowMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const fieldCount = this.reader.int16()
    const fields: any[] = new Array(fieldCount)
    for (let i = 0; i < fieldCount; i++) {
      const len = this.reader.int32()
      // a -1 for length means the value of the field is null
      fields[i] = len === -1 ? null : this.reader.string(len)
    }
    return new DataRowMessage(length, fields)
  }
  
  private parseParameterStatusMessage(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const name = this.reader.cstring()
    const value = this.reader.cstring()
    return new ParameterStatusMessage(length, name, value)
  }
  
  private parseBackendKeyData(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const processID = this.reader.int32()
    const secretKey = this.reader.int32()
    return new BackendKeyDataMessage(length, processID, secretKey)
  }
  
  public parseAuthenticationResponse(offset: number, length: number, bytes: Buffer) {
    this.reader.setBuffer(offset, bytes)
    const code = this.reader.int32()
    // TODO(bmc): maybe better types here
    const message: BackendMessage & any = {
      name: 'authenticationOk',
      length,
    }
    switch (code) {
      case 0: // AuthenticationOk
        break
      case 3: // AuthenticationCleartextPassword
        if (message.length === 8) {
          message.name = 'authenticationCleartextPassword'
        }
        break
      case 5: // AuthenticationMD5Password
        if (message.length === 12) {
          message.name = 'authenticationMD5Password'
          const salt = this.reader.bytes(4)
          return new AuthenticationMD5Password(length, salt)
        }
        break
      case 10: // AuthenticationSASL
        {
          message.name = 'authenticationSASL'
          message.mechanisms = []
          let mechanism: string
          do {
            mechanism = this.reader.cstring()
            if (mechanism) {
              message.mechanisms.push(mechanism)
            }
          } while (mechanism)
        }
        break
      case 11: // AuthenticationSASLContinue
        message.name = 'authenticationSASLContinue'
        message.data = this.reader.string(length - 8)
        break
      case 12: // AuthenticationSASLFinal
        message.name = 'authenticationSASLFinal'
        message.data = this.reader.string(length - 8)
        break
      default:
        throw new Error('Unknown authenticationOk message type ' + code)
    }
    return message
  }
  
  private parseErrorMessage(offset: number, length: number, bytes: Buffer, name: MessageName) {
    this.reader.setBuffer(offset, bytes)
    const fields: Record<string, string> = {}
    let fieldType = this.reader.string(1)
    while (fieldType !== '\0') {
      fields[fieldType] = this.reader.cstring()
      fieldType = this.reader.string(1)
    }
    const messageValue = fields.M
    const message =
      name === 'notice' ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name)
    message.severity = fields.S
    message.code = fields.C
    message.detail = fields.D
    message.hint = fields.H
    message.position = fields.P
    message.internalPosition = fields.p
    message.internalQuery = fields.q
    message.where = fields.W
    message.schema = fields.s
    message.table = fields.t
    message.column = fields.c
    message.dataType = fields.d
    message.constraint = fields.n
    message.file = fields.F
    message.line = fields.L
    message.routine = fields.R
    return message
  }
}

@wpietrucha
Copy link
Author

@hjr3 , thank you for referencing the benchmarsk - they were very useful.
I've created a dedicated benchmark (@regevbr - maybe you can use that for ietrating on your ideas) that shows the problem and the impact of the solution. In the PR description you can see the results:
wpietrucha#1

@regevbr , I've also run benchmark against your solution - please take a look on results.

@regevbr
Copy link
Contributor

regevbr commented Aug 20, 2025

Awsome, I created another version, in whcih we dont merge buffers, only when extracting concrete data out of them (like strings). Can you please benchmark it as well please?

import { TransformOptions } from 'stream'
import {
    Mode,
    bindComplete,
    parseComplete,
    closeComplete,
    noData,
    portalSuspended,
    copyDone,
    replicationStart,
    emptyQuery,
    ReadyForQueryMessage,
    CommandCompleteMessage,
    CopyDataMessage,
    CopyResponse,
    NotificationResponseMessage,
    RowDescriptionMessage,
    ParameterDescriptionMessage,
    Field,
    DataRowMessage,
    ParameterStatusMessage,
    BackendKeyDataMessage,
    DatabaseError,
    BackendMessage,
    MessageName,
    AuthenticationMD5Password,
    NoticeMessage,
} from './messages'
import { BufferReader } from './buffer-reader'

// every message is prefixed with a single bye
const CODE_LENGTH = 1
// every message has an int32 length which includes itself but does
// NOT include the code in the length
const LEN_LENGTH = 4
const HEADER_LENGTH = CODE_LENGTH + LEN_LENGTH
const UINT32_SIZE = 4

export type Packet = {
    code: number
    packet: Buffer
}

const emptyBuffer = Buffer.allocUnsafe(0)

type StreamOptions = TransformOptions & {
    mode: Mode
}

const enum MessageCodes {
    DataRow = 0x44, // D
    ParseComplete = 0x31, // 1
    BindComplete = 0x32, // 2
    CloseComplete = 0x33, // 3
    CommandComplete = 0x43, // C
    ReadyForQuery = 0x5a, // Z
    NoData = 0x6e, // n
    NotificationResponse = 0x41, // A
    AuthenticationResponse = 0x52, // R
    ParameterStatus = 0x53, // S
    BackendKeyData = 0x4b, // K
    ErrorMessage = 0x45, // E
    NoticeMessage = 0x4e, // N
    RowDescriptionMessage = 0x54, // T
    ParameterDescriptionMessage = 0x74, // t
    PortalSuspended = 0x73, // s
    ReplicationStart = 0x57, // W
    EmptyQuery = 0x49, // I
    CopyIn = 0x47, // G
    CopyOut = 0x48, // H
    CopyDone = 0x63, // c
    CopyData = 0x64, // d
}

export type MessageCallback = (msg: BackendMessage) => void

interface BufferChunk {
    buffer: Buffer
    offset: number
    length: number
}

/**
 * VirtualBuffer provides a unified view over multiple buffers without merging them
 * Implements the same interface as BufferReader for compatibility
 */
class VirtualBuffer {
    private chunks: BufferChunk[] = []
    private _totalLength = 0
    private currentPosition = 0
    private encoding: string = 'utf-8'

    constructor(chunks: BufferChunk[] = []) {
        this.chunks = chunks.slice()
        this._totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0)
    }

    get totalLength(): number {
        return this._totalLength
    }

    get position(): number {
        return this.currentPosition
    }

    /**
     * Add a new buffer chunk to the virtual buffer
     */
    addChunk(buffer: Buffer, offset: number = 0, length: number = buffer.length - offset): void {
        if (length > 0) {
            this.chunks.push({ buffer, offset, length })
            this._totalLength += length
        }
    }

    /**
     * Peek at a byte at the current position without advancing
     */
    peekByte(): number {
        if (this.currentPosition >= this._totalLength) {
            throw new Error('Buffer overflow: attempted to peek beyond buffer end')
        }
        return this.readByteAt(this.currentPosition)
    }

    /**
     * Check if we can read a certain number of bytes from current position
     */
    canRead(bytes: number): boolean {
        return this.currentPosition + bytes <= this._totalLength
    }

    // BufferReader compatible methods

    /**
     * Read a single byte at the current position and advance
     */
    byte(): number {
        if (this.currentPosition >= this._totalLength) {
            throw new Error('Buffer overflow: attempted to read beyond buffer end')
        }

        const byte = this.readByteAt(this.currentPosition)
        this.currentPosition++
        return byte
    }

    /**
     * Read a signed 16-bit integer (big endian) and advance
     */
    int16(): number {
        if (this.currentPosition + 2 > this._totalLength) {
            throw new Error('Buffer overflow: attempted to read beyond buffer end')
        }

        const value = this.readUInt16BEAt(this.currentPosition)
        this.currentPosition += 2
        // Convert unsigned to signed
        return value > 0x7FFF ? value - 0x10000 : value
    }

    /**
     * Read a signed 32-bit integer (big endian) and advance
     */
    int32(): number {
        if (this.currentPosition + UINT32_SIZE > this._totalLength) {
            throw new Error('Buffer overflow: attempted to read beyond buffer end')
        }

        const value = this.readUInt32BEAt(this.currentPosition)
        this.currentPosition += UINT32_SIZE
        // Convert unsigned to signed
        return value > 0x7FFFFFFF ? value - 0x100000000 : value
    }

    /**
     * Read an unsigned 32-bit integer (big endian) and advance
     */
    uint32(): number {
        if (this.currentPosition + UINT32_SIZE > this._totalLength) {
            throw new Error('Buffer overflow: attempted to read beyond buffer end')
        }

        const value = this.readUInt32BEAt(this.currentPosition)
        this.currentPosition += UINT32_SIZE
        return value
    }

    /**
     * Read a string of specified length and advance
     */
    string(length: number): string {
        const bytes = this.bytes(length)
        return bytes.toString(this.encoding)
    }

    /**
     * Read a null-terminated string and advance
     */
    cstring(): string {
        const start = this.currentPosition
        let end = start

        // Find the null terminator
        while (end < this._totalLength && this.readByteAt(end) !== 0) {
            end++
        }

        // Read the string content (without null terminator)
        const length = end - start
        const result = length > 0 ? this.string(length) : ''

        // Skip the null terminator if we found one
        if (end < this._totalLength) {
            this.currentPosition++ // Skip null byte
        }

        return result
    }

    /**
     * Read bytes at the current position and advance
     */
    bytes(length: number): Buffer {
        if (this.currentPosition + length > this._totalLength) {
            throw new Error('Buffer overflow: attempted to read beyond buffer end')
        }

        const result = this.getBytesAt(this.currentPosition, length)
        this.currentPosition += length
        return result
    }

    /**
     * Create a new VirtualBuffer from current position onwards
     */
    slice(start: number = this.currentPosition, end?: number): VirtualBuffer {
        const endPos = end ?? this._totalLength
        const newChunks: BufferChunk[] = []
        let virtualPos = 0

        for (const chunk of this.chunks) {
            const chunkEnd = virtualPos + chunk.length

            if (virtualPos >= endPos) break
            if (chunkEnd <= start) {
                virtualPos = chunkEnd
                continue
            }

            const chunkStart = Math.max(start - virtualPos, 0)
            const chunkActualEnd = Math.min(endPos - virtualPos, chunk.length)
            const newLength = chunkActualEnd - chunkStart

            if (newLength > 0) {
                newChunks.push({
                    buffer: chunk.buffer,
                    offset: chunk.offset + chunkStart,
                    length: newLength
                })
            }

            virtualPos = chunkEnd
        }

        return new VirtualBuffer(newChunks)
    }

    /**
     * Seek to a specific position
     */
    seek(position: number): void {
        if (position < 0 || position > this._totalLength) {
            throw new Error(`Invalid seek position: ${position}`)
        }
        this.currentPosition = position
    }

    /**
     * Remove consumed chunks to free memory
     */
    compact(): void {
        if (this.currentPosition === 0) return

        let remainingOffset = this.currentPosition
        let newChunks: BufferChunk[] = []

        for (const chunk of this.chunks) {
            if (remainingOffset >= chunk.length) {
                remainingOffset -= chunk.length
                continue
            }

            if (remainingOffset > 0) {
                newChunks.push({
                    buffer: chunk.buffer,
                    offset: chunk.offset + remainingOffset,
                    length: chunk.length - remainingOffset
                })
                remainingOffset = 0
            } else {
                newChunks.push(chunk)
            }
        }

        this.chunks = newChunks
        this._totalLength -= this.currentPosition
        this.currentPosition = 0
    }

    // Private helper methods
    private findChunkAndOffset(virtualOffset: number): { chunkIndex: number; offsetInChunk: number } | null {
        let currentOffset = 0

        for (let i = 0; i < this.chunks.length; i++) {
            const chunk = this.chunks[i]
            if (virtualOffset >= currentOffset && virtualOffset < currentOffset + chunk.length) {
                return {
                    chunkIndex: i,
                    offsetInChunk: virtualOffset - currentOffset
                }
            }
            currentOffset += chunk.length
        }

        return null
    }

    private readByteAt(virtualOffset: number): number {
        const location = this.findChunkAndOffset(virtualOffset)
        if (!location) {
            throw new Error(`Invalid offset: ${virtualOffset}`)
        }

        const chunk = this.chunks[location.chunkIndex]
        return chunk.buffer[chunk.offset + location.offsetInChunk]
    }

    private readUInt32BEAt(virtualOffset: number): number {
        const location = this.findChunkAndOffset(virtualOffset)
        if (!location) {
            throw new Error(`Invalid offset: ${virtualOffset}`)
        }

        const chunk = this.chunks[location.chunkIndex]

        // If the entire UInt32 fits in this chunk
        if (location.offsetInChunk + UINT32_SIZE <= chunk.length) {
            return chunk.buffer.readUInt32BE(chunk.offset + location.offsetInChunk)
        }

        // UInt32 spans multiple chunks - read byte by byte
        const tempBuffer = Buffer.allocUnsafe(UINT32_SIZE)
        for (let i = 0; i < UINT32_SIZE; i++) {
            tempBuffer[i] = this.readByteAt(virtualOffset + i)
        }
        return tempBuffer.readUInt32BE(0)
    }

    private readUInt16BEAt(virtualOffset: number): number {
        const location = this.findChunkAndOffset(virtualOffset)
        if (!location) {
            throw new Error(`Invalid offset: ${virtualOffset}`)
        }

        const chunk = this.chunks[location.chunkIndex]

        // If the entire UInt16 fits in this chunk
        if (location.offsetInChunk + 2 <= chunk.length) {
            return chunk.buffer.readUInt16BE(chunk.offset + location.offsetInChunk)
        }

        // UInt16 spans chunks - read byte by byte
        const tempBuffer = Buffer.allocUnsafe(2)
        for (let i = 0; i < 2; i++) {
            tempBuffer[i] = this.readByteAt(virtualOffset + i)
        }
        return tempBuffer.readUInt16BE(0)
    }

    private getBytesAt(virtualOffset: number, length: number): Buffer {
        if (length === 0) return emptyBuffer

        const location = this.findChunkAndOffset(virtualOffset)
        if (!location) {
            throw new Error(`Invalid offset: ${virtualOffset}`)
        }

        const startChunk = this.chunks[location.chunkIndex]

        // If all bytes fit in a single chunk, return a slice
        if (location.offsetInChunk + length <= startChunk.length) {
            return startChunk.buffer.subarray(
                startChunk.offset + location.offsetInChunk,
                startChunk.offset + location.offsetInChunk + length
            )
        }

        // Bytes span multiple chunks - copy into new buffer
        const result = Buffer.allocUnsafe(length)
        let resultOffset = 0
        let remainingLength = length
        let currentVirtualOffset = virtualOffset

        while (remainingLength > 0) {
            const loc = this.findChunkAndOffset(currentVirtualOffset)
            if (!loc) break

            const chunk = this.chunks[loc.chunkIndex]
            const availableInChunk = chunk.length - loc.offsetInChunk
            const toCopy = Math.min(remainingLength, availableInChunk)

            chunk.buffer.copy(
                result,
                resultOffset,
                chunk.offset + loc.offsetInChunk,
                chunk.offset + loc.offsetInChunk + toCopy
            )

            resultOffset += toCopy
            remainingLength -= toCopy
            currentVirtualOffset += toCopy
        }

        return result
    }
}

export class Parser {
    private virtualBuffer = new VirtualBuffer()
    private mode: Mode

    constructor(opts?: StreamOptions) {
        if (opts?.mode === 'binary') {
            throw new Error('Binary mode not supported yet')
        }
        this.mode = opts?.mode || 'text'
    }

    public parse(buffer: Buffer, callback: MessageCallback): void {
        // Add new buffer to virtual buffer
        this.virtualBuffer.addChunk(buffer)

        // Parse all complete messages
        while (this.virtualBuffer.canRead(HEADER_LENGTH)) {
            const startPosition = this.virtualBuffer.position

            // Read message header
            const code = this.virtualBuffer.byte()
            const length = this.virtualBuffer.uint32()
            const payloadLength = length - LEN_LENGTH

            // Check if we have the complete message
            if (!this.virtualBuffer.canRead(payloadLength)) {
                // Incomplete message, rewind and wait for more data
                this.virtualBuffer.seek(startPosition)
                break
            }

            // Create a slice for just this message's payload
            const messageBuffer = this.virtualBuffer.slice(
                this.virtualBuffer.position,
                this.virtualBuffer.position + payloadLength
            )

            // Advance past this message
            this.virtualBuffer.seek(this.virtualBuffer.position + payloadLength)

            // Process the message using the virtual buffer directly
            const message = this.handlePacketWithVirtualBuffer(code, length, messageBuffer)
            callback(message)
        }

        // Compact the virtual buffer to free consumed chunks
        this.virtualBuffer.compact()
    }

    private handlePacketWithVirtualBuffer(code: number, length: number, messageBuffer: VirtualBuffer): BackendMessage {
        switch (code) {
            case MessageCodes.BindComplete:
                return bindComplete
            case MessageCodes.ParseComplete:
                return parseComplete
            case MessageCodes.CloseComplete:
                return closeComplete
            case MessageCodes.NoData:
                return noData
            case MessageCodes.PortalSuspended:
                return portalSuspended
            case MessageCodes.CopyDone:
                return copyDone
            case MessageCodes.ReplicationStart:
                return replicationStart
            case MessageCodes.EmptyQuery:
                return emptyQuery
            case MessageCodes.DataRow:
                return this.parseDataRowMessage(length, messageBuffer)
            case MessageCodes.CommandComplete:
                return this.parseCommandCompleteMessage(length, messageBuffer)
            case MessageCodes.ReadyForQuery:
                return this.parseReadyForQueryMessage(length, messageBuffer)
            case MessageCodes.NotificationResponse:
                return this.parseNotificationMessage(length, messageBuffer)
            case MessageCodes.AuthenticationResponse:
                return this.parseAuthenticationResponse(length, messageBuffer)
            case MessageCodes.ParameterStatus:
                return this.parseParameterStatusMessage(length, messageBuffer)
            case MessageCodes.BackendKeyData:
                return this.parseBackendKeyData(length, messageBuffer)
            case MessageCodes.ErrorMessage:
                return this.parseErrorMessage(length, 'error', messageBuffer)
            case MessageCodes.NoticeMessage:
                return this.parseErrorMessage(length, 'notice', messageBuffer)
            case MessageCodes.RowDescriptionMessage:
                return this.parseRowDescriptionMessage(length, messageBuffer)
            case MessageCodes.ParameterDescriptionMessage:
                return this.parseParameterDescriptionMessage(length, messageBuffer)
            case MessageCodes.CopyIn:
                return this.parseCopyInMessage(length, messageBuffer)
            case MessageCodes.CopyOut:
                return this.parseCopyOutMessage(length, messageBuffer)
            case MessageCodes.CopyData:
                return this.parseCopyDataMessage(length, messageBuffer)
            default:
                return new DatabaseError('received invalid response: ' + code.toString(16), length, 'error')
        }
    }

    // Updated parsing methods that use VirtualBuffer directly
    private parseReadyForQueryMessage(length: number, reader: VirtualBuffer): ReadyForQueryMessage {
        const status = reader.string(1)
        return new ReadyForQueryMessage(length, status)
    }

    private parseCommandCompleteMessage(length: number, reader: VirtualBuffer): CommandCompleteMessage {
        const text = reader.cstring()
        return new CommandCompleteMessage(length, text)
    }

    private parseCopyDataMessage(length: number, reader: VirtualBuffer): CopyDataMessage {
        const chunk = reader.bytes(length - 4)
        return new CopyDataMessage(length, chunk)
    }

    private parseCopyInMessage(length: number, reader: VirtualBuffer): CopyResponse {
        return this.parseCopyMessage(length, 'copyInResponse', reader)
    }

    private parseCopyOutMessage(length: number, reader: VirtualBuffer): CopyResponse {
        return this.parseCopyMessage(length, 'copyOutResponse', reader)
    }

    private parseCopyMessage(length: number, messageName: MessageName, reader: VirtualBuffer): CopyResponse {
        const isBinary = reader.byte() !== 0
        const columnCount = reader.int16()
        const message = new CopyResponse(length, messageName, isBinary, columnCount)
        for (let i = 0; i < columnCount; i++) {
            message.columnTypes[i] = reader.int16()
        }
        return message
    }

    private parseNotificationMessage(length: number, reader: VirtualBuffer): NotificationResponseMessage {
        const processId = reader.int32()
        const channel = reader.cstring()
        const payload = reader.cstring()
        return new NotificationResponseMessage(length, processId, channel, payload)
    }

    private parseRowDescriptionMessage(length: number, reader: VirtualBuffer): RowDescriptionMessage {
        const fieldCount = reader.int16()
        const message = new RowDescriptionMessage(length, fieldCount)
        for (let i = 0; i < fieldCount; i++) {
            message.fields[i] = this.parseField(reader)
        }
        return message
    }

    private parseField(reader: VirtualBuffer): Field {
        const name = reader.cstring()
        const tableID = reader.uint32()
        const columnID = reader.int16()
        const dataTypeID = reader.uint32()
        const dataTypeSize = reader.int16()
        const dataTypeModifier = reader.int32()
        const mode = reader.int16() === 0 ? 'text' : 'binary'
        return new Field(name, tableID, columnID, dataTypeID, dataTypeSize, dataTypeModifier, mode)
    }

    private parseParameterDescriptionMessage(length: number, reader: VirtualBuffer): ParameterDescriptionMessage {
        const parameterCount = reader.int16()
        const message = new ParameterDescriptionMessage(length, parameterCount)
        for (let i = 0; i < parameterCount; i++) {
            message.dataTypeIDs[i] = reader.int32()
        }
        return message
    }

    private parseDataRowMessage(length: number, reader: VirtualBuffer): DataRowMessage {
        const fieldCount = reader.int16()
        const fields: any[] = new Array(fieldCount)
        for (let i = 0; i < fieldCount; i++) {
            const len = reader.int32()
            // a -1 for length means the value of the field is null
            fields[i] = len === -1 ? null : reader.string(len)
        }
        return new DataRowMessage(length, fields)
    }

    private parseParameterStatusMessage(length: number, reader: VirtualBuffer): ParameterStatusMessage {
        const name = reader.cstring()
        const value = reader.cstring()
        return new ParameterStatusMessage(length, name, value)
    }

    private parseBackendKeyData(length: number, reader: VirtualBuffer): BackendKeyDataMessage {
        const processID = reader.int32()
        const secretKey = reader.int32()
        return new BackendKeyDataMessage(length, processID, secretKey)
    }

    private parseAuthenticationResponse(length: number, reader: VirtualBuffer): BackendMessage {
        const code = reader.int32()
        const message: BackendMessage & any = {
            name: 'authenticationOk',
            length,
        }
        switch (code) {
            case 0: // AuthenticationOk
                break
            case 3: // AuthenticationCleartextPassword
                if (message.length === 8) {
                    message.name = 'authenticationCleartextPassword'
                }
                break
            case 5: // AuthenticationMD5Password
                if (message.length === 12) {
                    message.name = 'authenticationMD5Password'
                    const salt = reader.bytes(4)
                    return new AuthenticationMD5Password(length, salt)
                }
                break
            case 10: // AuthenticationSASL
            {
                message.name = 'authenticationSASL'
                message.mechanisms = []
                let mechanism: string
                do {
                    mechanism = reader.cstring()
                    if (mechanism) {
                        message.mechanisms.push(mechanism)
                    }
                } while (mechanism)
            }
                break
            case 11: // AuthenticationSASLContinue
                message.name = 'authenticationSASLContinue'
                message.data = reader.string(length - 8)
                break
            case 12: // AuthenticationSASLFinal
                message.name = 'authenticationSASLFinal'
                message.data = reader.string(length - 8)
                break
            default:
                throw new Error('Unknown authenticationOk message type ' + code)
        }
        return message
    }

    private parseErrorMessage(length: number, name: MessageName, reader: VirtualBuffer): BackendMessage {
        const fields: Record<string, string> = {}
        let fieldType = reader.string(1)
        while (fieldType !== '\0') {
            fields[fieldType] = reader.cstring()
            fieldType = reader.string(1)
        }
        const messageValue = fields.M
        const message =
            name === 'notice' ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name)
        message.severity = fields.S
        message.code = fields.C
        message.detail = fields.D
        message.hint = fields.H
        message.position = fields.P
        message.internalPosition = fields.p
        message.internalQuery = fields.q
        message.where = fields.W
        message.schema = fields.s
        message.table = fields.t
        message.column = fields.c
        message.dataType = fields.d
        message.constraint = fields.n
        message.file = fields.F
        message.line = fields.L
        message.routine = fields.R
        return message
    }
}

@brianc
Copy link
Owner

brianc commented Aug 20, 2025

I do like where this virtual buffer approach is headed. I think that's probably somewhat ideal - both good on space and doesn't allocate or copy any buffers. I think I wanted to take a stab at that long ago and didn't for whatever reason (probably laziness 😛 )

@regevbr
Copy link
Contributor

regevbr commented Aug 20, 2025

We also need to bnechmark the cpu utilization between all approaches (and compare them to the original code before my PR)

@wpietrucha
Copy link
Author

@regevbr , I've run benchmarks again and checked results - looks like the first version is better.
I've posted results here: wpietrucha#1 (review)

@regevbr
Copy link
Contributor

regevbr commented Aug 20, 2025

Thanks. Exciting, I wonder how that can even be... @brianc thoughts? The fact that my first version is slightly worse than @wpietrucha is also mind-bugging

@brianc
Copy link
Owner

brianc commented Aug 20, 2025

Thanks. Exciting, I wonder how that can even be... @brianc thoughts? The fact that my first version is slightly worse than @wpietrucha is also mind-bugging

Hey I can't wait to take a look at this - sorry I'm absolutely slammed at work right now so might have to be later tonight or this weekend 😬 I also have trust in @hjr3 and @charmander to make the right call. 😄

@hjr3
Copy link
Collaborator

hjr3 commented Aug 21, 2025

I ran the benchmarks locally and it does appear that the "shrink buffer when < 50% utilization" approach is the fastest. I am not sure I am seeing a 5% improvement, but benchmarks on a laptop are always a bit fuzzy.

I have to spend more time on this, but thank you for the reproduction steps!

@charmander
Copy link
Collaborator

I agree that this is a bug, but it’s surprising (in order for the root cause to be exactly as described) that you’d never get a packet that ends on a message boundary into the parser at any point (especially in a benchmark, at the end of an iteration). What happens if you trigger garbage collection manually in the benchmark?

@charmander
Copy link
Collaborator

The bug I’ve opened #3533 to address might be adding confusion.

@regevbr
Copy link
Contributor

regevbr commented Aug 22, 2025

The bug I’ve opened #3533 to address might be adding confusion.

Great lets rerun the bench marks after this is merged and see if thye make more sense

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants