-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Fix: Resolve Memory Leak in Parser Buffer Management #3531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@regevbr , I noticed that you are author of the initial code here. Could you take a look on this change? |
I think your example is not covering the situation here. Once a buffer gets increased in size, the only way for it to get smaller is in a very specific case where the parsing ends and there is no more data to be parsed in the buffer. I guess that this case doesn't happen for you since the connection is never "resting". Also, since you probably have a lot of info coming on the stream and the processing is slower/the data is very big, the buffer gets bigger and bigger. Your solution will indeed reduce the buffer, but can potentially introduce the previous issues that were fixed in my PR as you will cause a lot of sync buffer allocations over and over again. I can suggest 2 things:
Can you at least try approach 2 locally and see if it resolves it for you? What are your thoughts on my approach? In addition, can you add a bench test that recreates your scenario, and then make sure that your fix works on it? @brianc, your input is also needed here, please |
I came here to say this as well. Even a small reproducible script would be helpful to understand the behavior you are seeing. Also, some prior art for benchmarking: https://github.com/hjr3/node-postgres/blob/b1609b5732515f07cac32f699fbdb3d4f63d852d/packages/pg-bench/src/binary-format.ts |
I recall intentionally doing this or something similar in the past for performance reasons. I don't consider it a memory leak to grow the internal memory usage of the parser to the largest buffer received while the parser is allocated. It wont grow infinitely, so I don't think it's a "leak." It's just a space/time trade-off: "take more space to save time having to realloc memory more often." I would like to see benchmarks/numbers around the tradeoff to see what we give up performance wise going the other way. |
74db528
to
e23b8f5
Compare
Fix buffer shrinking: create new appropriately-sized buffer instead of just adjusting pointers when partial messages remain. This allows garbage collection of large processed buffers that were previously kept alive due to cursor-based buffer management. Resolves memory leak where large buffers would remain in memory indefinitely even when only a few bytes of data were still needed.
e23b8f5
to
34b5c51
Compare
Implement sophisticated buffer management that only shrinks when buffer is more than half empty, and reduces to half size (not exact size) to provide wiggle room for incoming data. This prevents both memory leaks from oversized buffers and performance issues from excessive reallocations in high-throughput scenarios. - Only shrink when buffer utilization < 50% - When shrinking, reduce to max(half current size, 2x remaining data) - Gradual reduction allows large buffers to shrink progressively - Falls back to cursor strategy when buffer utilization is reasonable
c161069
to
68763e8
Compare
Sure, thank you for comments.
@hjr3 , sure, will try. |
I actually took the chance to have a vibe coding challenge. I used claude and it took 55 versions to get here haha. import { TransformOptions } from 'stream'
import {
Mode,
bindComplete,
parseComplete,
closeComplete,
noData,
portalSuspended,
copyDone,
replicationStart,
emptyQuery,
ReadyForQueryMessage,
CommandCompleteMessage,
CopyDataMessage,
CopyResponse,
NotificationResponseMessage,
RowDescriptionMessage,
ParameterDescriptionMessage,
Field,
DataRowMessage,
ParameterStatusMessage,
BackendKeyDataMessage,
DatabaseError,
BackendMessage,
MessageName,
AuthenticationMD5Password,
NoticeMessage,
} from './messages'
import { BufferReader } from './buffer-reader'
// every message is prefixed with a single bye
const CODE_LENGTH = 1
// every message has an int32 length which includes itself but does
// NOT include the code in the length
const LEN_LENGTH = 4
const HEADER_LENGTH = CODE_LENGTH + LEN_LENGTH
const UINT32_SIZE = 4
export type Packet = {
code: number
packet: Buffer
}
const emptyBuffer = Buffer.allocUnsafe(0)
type StreamOptions = TransformOptions & {
mode: Mode
}
const enum MessageCodes {
DataRow = 0x44, // D
ParseComplete = 0x31, // 1
BindComplete = 0x32, // 2
CloseComplete = 0x33, // 3
CommandComplete = 0x43, // C
ReadyForQuery = 0x5a, // Z
NoData = 0x6e, // n
NotificationResponse = 0x41, // A
AuthenticationResponse = 0x52, // R
ParameterStatus = 0x53, // S
BackendKeyData = 0x4b, // K
ErrorMessage = 0x45, // E
NoticeMessage = 0x4e, // N
RowDescriptionMessage = 0x54, // T
ParameterDescriptionMessage = 0x74, // t
PortalSuspended = 0x73, // s
ReplicationStart = 0x57, // W
EmptyQuery = 0x49, // I
CopyIn = 0x47, // G
CopyOut = 0x48, // H
CopyDone = 0x63, // c
CopyData = 0x64, // d
}
export type MessageCallback = (msg: BackendMessage) => void
interface ParseResultMerge {
needsMerge: true
virtualOffset: number
}
interface ParseResultNoMerge {
needsMerge: false
remainingBuffer: Buffer
remainingOffset: number
remainingLength: number
}
type ParseResult = ParseResultMerge | ParseResultNoMerge
interface BufferLocation {
buffer: Buffer
offset: number
}
/**
* VirtualBuffer provides a unified view over two separate buffers,
* allowing seamless reading across buffer boundaries
*/
class VirtualBuffer {
constructor(
private oldBuffer: Buffer,
private oldOffset: number,
private oldLength: number,
private newBuffer: Buffer
) {}
get totalLength(): number {
return this.oldLength + this.newBuffer.length
}
/**
* Read a single byte at a virtual offset
*/
readByteAt(virtualOffset: number): number {
if (virtualOffset < this.oldLength) {
return this.oldBuffer[this.oldOffset + virtualOffset]
} else {
return this.newBuffer[virtualOffset - this.oldLength]
}
}
/**
* Read a UInt32BE at a virtual offset, potentially spanning both buffers
*/
readUInt32BEAt(virtualOffset: number): number {
// If the entire UInt32 is in the old buffer
if (virtualOffset + UINT32_SIZE <= this.oldLength) {
return this.oldBuffer.readUInt32BE(this.oldOffset + virtualOffset)
}
// If the entire UInt32 is in the new buffer
if (virtualOffset >= this.oldLength) {
return this.newBuffer.readUInt32BE(virtualOffset - this.oldLength)
}
// UInt32 spans both buffers - create a temporary buffer and use readUInt32BE
const tempBuffer = Buffer.allocUnsafe(UINT32_SIZE)
for (let i = 0; i < UINT32_SIZE; i++) {
tempBuffer[i] = this.readByteAt(virtualOffset + i)
}
return tempBuffer.readUInt32BE(0)
}
/**
* Get buffer location for data, extracting into a new buffer if it spans both buffers
* Always returns a BufferLocation with the appropriate buffer and offset
*/
getBufferLocation(virtualOffset: number, length: number): BufferLocation {
// Data entirely in old buffer
if (virtualOffset + length <= this.oldLength) {
return {
buffer: this.oldBuffer,
offset: this.oldOffset + virtualOffset
}
}
// Data entirely in new buffer
if (virtualOffset >= this.oldLength) {
return {
buffer: this.newBuffer,
offset: virtualOffset - this.oldLength
}
}
// Data spans both buffers - extract into a new buffer
const oldBufferPart = this.oldLength - virtualOffset
const newBufferPart = length - oldBufferPart
const extractedBuffer = Buffer.allocUnsafe(length)
this.oldBuffer.copy(extractedBuffer, 0, this.oldOffset + virtualOffset, this.oldOffset + this.oldLength)
this.newBuffer.copy(extractedBuffer, oldBufferPart, 0, newBufferPart)
return {
buffer: extractedBuffer,
offset: 0
}
}
/**
* Calculate remaining data after consuming up to virtualOffset
*/
calculateRemaining(virtualOffset: number): ParseResult {
const remainingLength = this.totalLength - virtualOffset
if (remainingLength === 0) {
// Nothing left
return {
needsMerge: false,
remainingBuffer: emptyBuffer,
remainingOffset: 0,
remainingLength: 0
}
}
// If all remaining data is in the new buffer
if (virtualOffset >= this.oldLength) {
const newOffset = virtualOffset - this.oldLength
return {
needsMerge: false,
remainingBuffer: this.newBuffer,
remainingOffset: newOffset,
remainingLength: remainingLength
}
}
// Remaining data spans both buffers - need to merge
return {
needsMerge: true,
virtualOffset
}
}
}
export class Parser {
private buffer: Buffer = emptyBuffer
private bufferLength: number = 0
private bufferOffset: number = 0
private reader = new BufferReader()
private mode: Mode
constructor(opts?: StreamOptions) {
if (opts?.mode === 'binary') {
throw new Error('Binary mode not supported yet')
}
this.mode = opts?.mode || 'text'
}
public parse(buffer: Buffer, callback: MessageCallback) {
const parseResult = this.parseVirtualBuffer(buffer, callback)
if (parseResult.needsMerge) {
this.updateBufferAfterParsing(parseResult.virtualOffset)
this.mergeBuffer(buffer)
} else {
this.setRemainingBuffer(parseResult)
}
}
private updateBufferAfterParsing(consumedBytes: number): void {
this.bufferOffset += consumedBytes
this.bufferLength -= consumedBytes
}
private setRemainingBuffer(parseResult: ParseResultNoMerge): void {
this.buffer = parseResult.remainingBuffer
this.bufferOffset = parseResult.remainingOffset
this.bufferLength = parseResult.remainingLength
}
private parseVirtualBuffer(newBuffer: Buffer, callback: MessageCallback): ParseResult {
const virtualBuffer = new VirtualBuffer(this.buffer, this.bufferOffset, this.bufferLength, newBuffer)
let virtualOffset = 0
while (virtualOffset + HEADER_LENGTH <= virtualBuffer.totalLength) {
const messageInfo = this.readMessageHeader(virtualBuffer, virtualOffset)
const fullMessageLength = CODE_LENGTH + messageInfo.length
if (virtualOffset + fullMessageLength <= virtualBuffer.totalLength) {
this.processCompleteMessage(virtualBuffer, virtualOffset, messageInfo, callback)
virtualOffset += fullMessageLength
} else {
break // Incomplete message, stop parsing
}
}
return virtualBuffer.calculateRemaining(virtualOffset)
}
private readMessageHeader(virtualBuffer: VirtualBuffer, virtualOffset: number): { code: number; length: number } {
const code = virtualBuffer.readByteAt(virtualOffset)
const length = virtualBuffer.readUInt32BEAt(virtualOffset + CODE_LENGTH)
return { code, length }
}
private processCompleteMessage(
virtualBuffer: VirtualBuffer,
virtualOffset: number,
messageInfo: { code: number; length: number },
callback: MessageCallback
): void {
const messageStartOffset = virtualOffset + HEADER_LENGTH
const bufferLocation = virtualBuffer.getBufferLocation(messageStartOffset, messageInfo.length)
const message = this.handlePacket(bufferLocation.offset, messageInfo.code, messageInfo.length, bufferLocation.buffer)
callback(message)
}
private mergeBuffer(buffer: Buffer): void {
if (this.bufferLength > 0) {
const newLength = this.bufferLength + buffer.byteLength
const newFullLength = newLength + this.bufferOffset
if (newFullLength > this.buffer.byteLength) {
// We can't concat the new buffer with the remaining one
let newBuffer: Buffer
if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) {
// We can move the relevant part to the beginning of the buffer instead of allocating a new buffer
newBuffer = this.buffer
} else {
// Allocate a new larger buffer
let newBufferLength = this.buffer.byteLength * 2
while (newLength >= newBufferLength) {
newBufferLength *= 2
}
newBuffer = Buffer.allocUnsafe(newBufferLength)
}
// Move the remaining buffer to the new one
this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength)
this.buffer = newBuffer
this.bufferOffset = 0
}
// Concat the new buffer with the remaining one
buffer.copy(this.buffer, this.bufferOffset + this.bufferLength)
this.bufferLength = newLength
} else {
this.buffer = buffer
this.bufferOffset = 0
this.bufferLength = buffer.byteLength
}
}
private handlePacket(offset: number, code: number, length: number, bytes: Buffer): BackendMessage {
switch (code) {
case MessageCodes.BindComplete:
return bindComplete
case MessageCodes.ParseComplete:
return parseComplete
case MessageCodes.CloseComplete:
return closeComplete
case MessageCodes.NoData:
return noData
case MessageCodes.PortalSuspended:
return portalSuspended
case MessageCodes.CopyDone:
return copyDone
case MessageCodes.ReplicationStart:
return replicationStart
case MessageCodes.EmptyQuery:
return emptyQuery
case MessageCodes.DataRow:
return this.parseDataRowMessage(offset, length, bytes)
case MessageCodes.CommandComplete:
return this.parseCommandCompleteMessage(offset, length, bytes)
case MessageCodes.ReadyForQuery:
return this.parseReadyForQueryMessage(offset, length, bytes)
case MessageCodes.NotificationResponse:
return this.parseNotificationMessage(offset, length, bytes)
case MessageCodes.AuthenticationResponse:
return this.parseAuthenticationResponse(offset, length, bytes)
case MessageCodes.ParameterStatus:
return this.parseParameterStatusMessage(offset, length, bytes)
case MessageCodes.BackendKeyData:
return this.parseBackendKeyData(offset, length, bytes)
case MessageCodes.ErrorMessage:
return this.parseErrorMessage(offset, length, bytes, 'error')
case MessageCodes.NoticeMessage:
return this.parseErrorMessage(offset, length, bytes, 'notice')
case MessageCodes.RowDescriptionMessage:
return this.parseRowDescriptionMessage(offset, length, bytes)
case MessageCodes.ParameterDescriptionMessage:
return this.parseParameterDescriptionMessage(offset, length, bytes)
case MessageCodes.CopyIn:
return this.parseCopyInMessage(offset, length, bytes)
case MessageCodes.CopyOut:
return this.parseCopyOutMessage(offset, length, bytes)
case MessageCodes.CopyData:
return this.parseCopyData(offset, length, bytes)
default:
return new DatabaseError('received invalid response: ' + code.toString(16), length, 'error')
}
}
private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes)
const status = this.reader.string(1)
return new ReadyForQueryMessage(length, status)
}
private parseCommandCompleteMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes)
const text = this.reader.cstring()
return new CommandCompleteMessage(length, text)
}
private parseCopyData(offset: number, length: number, bytes: Buffer) {
const chunk = bytes.slice(offset, offset + (length - 4))
return new CopyDataMessage(length, chunk)
}
private parseCopyInMessage(offset: number, length: number, bytes: Buffer) {
return this.parseCopyMessage(offset, length, bytes, 'copyInResponse')
}
private parseCopyOutMessage(offset: number, length: number, bytes: Buffer) {
return this.parseCopyMessage(offset, length, bytes, 'copyOutResponse')
}
private parseCopyMessage(offset: number, length: number, bytes: Buffer, messageName: MessageName) {
this.reader.setBuffer(offset, bytes)
const isBinary = this.reader.byte() !== 0
const columnCount = this.reader.int16()
const message = new CopyResponse(length, messageName, isBinary, columnCount)
for (let i = 0; i < columnCount; i++) {
message.columnTypes[i] = this.reader.int16()
}
return message
}
private parseNotificationMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes)
const processId = this.reader.int32()
const channel = this.reader.cstring()
const payload = this.reader.cstring()
return new NotificationResponseMessage(length, processId, channel, payload)
}
private parseRowDescriptionMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes)
const fieldCount = this.reader.int16()
const message = new RowDescriptionMessage(length, fieldCount)
for (let i = 0; i < fieldCount; i++) {
message.fields[i] = this.parseField()
}
return message
}
private parseField(): Field {
const name = this.reader.cstring()
const tableID = this.reader.uint32()
const columnID = this.reader.int16()
const dataTypeID = this.reader.uint32()
const dataTypeSize = this.reader.int16()
const dataTypeModifier = this.reader.int32()
const mode = this.reader.int16() === 0 ? 'text' : 'binary'
return new Field(name, tableID, columnID, dataTypeID, dataTypeSize, dataTypeModifier, mode)
}
private parseParameterDescriptionMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes)
const parameterCount = this.reader.int16()
const message = new ParameterDescriptionMessage(length, parameterCount)
for (let i = 0; i < parameterCount; i++) {
message.dataTypeIDs[i] = this.reader.int32()
}
return message
}
private parseDataRowMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes)
const fieldCount = this.reader.int16()
const fields: any[] = new Array(fieldCount)
for (let i = 0; i < fieldCount; i++) {
const len = this.reader.int32()
// a -1 for length means the value of the field is null
fields[i] = len === -1 ? null : this.reader.string(len)
}
return new DataRowMessage(length, fields)
}
private parseParameterStatusMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes)
const name = this.reader.cstring()
const value = this.reader.cstring()
return new ParameterStatusMessage(length, name, value)
}
private parseBackendKeyData(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes)
const processID = this.reader.int32()
const secretKey = this.reader.int32()
return new BackendKeyDataMessage(length, processID, secretKey)
}
public parseAuthenticationResponse(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes)
const code = this.reader.int32()
// TODO(bmc): maybe better types here
const message: BackendMessage & any = {
name: 'authenticationOk',
length,
}
switch (code) {
case 0: // AuthenticationOk
break
case 3: // AuthenticationCleartextPassword
if (message.length === 8) {
message.name = 'authenticationCleartextPassword'
}
break
case 5: // AuthenticationMD5Password
if (message.length === 12) {
message.name = 'authenticationMD5Password'
const salt = this.reader.bytes(4)
return new AuthenticationMD5Password(length, salt)
}
break
case 10: // AuthenticationSASL
{
message.name = 'authenticationSASL'
message.mechanisms = []
let mechanism: string
do {
mechanism = this.reader.cstring()
if (mechanism) {
message.mechanisms.push(mechanism)
}
} while (mechanism)
}
break
case 11: // AuthenticationSASLContinue
message.name = 'authenticationSASLContinue'
message.data = this.reader.string(length - 8)
break
case 12: // AuthenticationSASLFinal
message.name = 'authenticationSASLFinal'
message.data = this.reader.string(length - 8)
break
default:
throw new Error('Unknown authenticationOk message type ' + code)
}
return message
}
private parseErrorMessage(offset: number, length: number, bytes: Buffer, name: MessageName) {
this.reader.setBuffer(offset, bytes)
const fields: Record<string, string> = {}
let fieldType = this.reader.string(1)
while (fieldType !== '\0') {
fields[fieldType] = this.reader.cstring()
fieldType = this.reader.string(1)
}
const messageValue = fields.M
const message =
name === 'notice' ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name)
message.severity = fields.S
message.code = fields.C
message.detail = fields.D
message.hint = fields.H
message.position = fields.P
message.internalPosition = fields.p
message.internalQuery = fields.q
message.where = fields.W
message.schema = fields.s
message.table = fields.t
message.column = fields.c
message.dataType = fields.d
message.constraint = fields.n
message.file = fields.F
message.line = fields.L
message.routine = fields.R
return message
}
} |
@hjr3 , thank you for referencing the benchmarsk - they were very useful. @regevbr , I've also run benchmark against your solution - please take a look on results. |
Awsome, I created another version, in whcih we dont merge buffers, only when extracting concrete data out of them (like strings). Can you please benchmark it as well please? import { TransformOptions } from 'stream'
import {
Mode,
bindComplete,
parseComplete,
closeComplete,
noData,
portalSuspended,
copyDone,
replicationStart,
emptyQuery,
ReadyForQueryMessage,
CommandCompleteMessage,
CopyDataMessage,
CopyResponse,
NotificationResponseMessage,
RowDescriptionMessage,
ParameterDescriptionMessage,
Field,
DataRowMessage,
ParameterStatusMessage,
BackendKeyDataMessage,
DatabaseError,
BackendMessage,
MessageName,
AuthenticationMD5Password,
NoticeMessage,
} from './messages'
import { BufferReader } from './buffer-reader'
// every message is prefixed with a single bye
const CODE_LENGTH = 1
// every message has an int32 length which includes itself but does
// NOT include the code in the length
const LEN_LENGTH = 4
const HEADER_LENGTH = CODE_LENGTH + LEN_LENGTH
const UINT32_SIZE = 4
export type Packet = {
code: number
packet: Buffer
}
const emptyBuffer = Buffer.allocUnsafe(0)
type StreamOptions = TransformOptions & {
mode: Mode
}
const enum MessageCodes {
DataRow = 0x44, // D
ParseComplete = 0x31, // 1
BindComplete = 0x32, // 2
CloseComplete = 0x33, // 3
CommandComplete = 0x43, // C
ReadyForQuery = 0x5a, // Z
NoData = 0x6e, // n
NotificationResponse = 0x41, // A
AuthenticationResponse = 0x52, // R
ParameterStatus = 0x53, // S
BackendKeyData = 0x4b, // K
ErrorMessage = 0x45, // E
NoticeMessage = 0x4e, // N
RowDescriptionMessage = 0x54, // T
ParameterDescriptionMessage = 0x74, // t
PortalSuspended = 0x73, // s
ReplicationStart = 0x57, // W
EmptyQuery = 0x49, // I
CopyIn = 0x47, // G
CopyOut = 0x48, // H
CopyDone = 0x63, // c
CopyData = 0x64, // d
}
export type MessageCallback = (msg: BackendMessage) => void
interface BufferChunk {
buffer: Buffer
offset: number
length: number
}
/**
* VirtualBuffer provides a unified view over multiple buffers without merging them
* Implements the same interface as BufferReader for compatibility
*/
class VirtualBuffer {
private chunks: BufferChunk[] = []
private _totalLength = 0
private currentPosition = 0
private encoding: string = 'utf-8'
constructor(chunks: BufferChunk[] = []) {
this.chunks = chunks.slice()
this._totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0)
}
get totalLength(): number {
return this._totalLength
}
get position(): number {
return this.currentPosition
}
/**
* Add a new buffer chunk to the virtual buffer
*/
addChunk(buffer: Buffer, offset: number = 0, length: number = buffer.length - offset): void {
if (length > 0) {
this.chunks.push({ buffer, offset, length })
this._totalLength += length
}
}
/**
* Peek at a byte at the current position without advancing
*/
peekByte(): number {
if (this.currentPosition >= this._totalLength) {
throw new Error('Buffer overflow: attempted to peek beyond buffer end')
}
return this.readByteAt(this.currentPosition)
}
/**
* Check if we can read a certain number of bytes from current position
*/
canRead(bytes: number): boolean {
return this.currentPosition + bytes <= this._totalLength
}
// BufferReader compatible methods
/**
* Read a single byte at the current position and advance
*/
byte(): number {
if (this.currentPosition >= this._totalLength) {
throw new Error('Buffer overflow: attempted to read beyond buffer end')
}
const byte = this.readByteAt(this.currentPosition)
this.currentPosition++
return byte
}
/**
* Read a signed 16-bit integer (big endian) and advance
*/
int16(): number {
if (this.currentPosition + 2 > this._totalLength) {
throw new Error('Buffer overflow: attempted to read beyond buffer end')
}
const value = this.readUInt16BEAt(this.currentPosition)
this.currentPosition += 2
// Convert unsigned to signed
return value > 0x7FFF ? value - 0x10000 : value
}
/**
* Read a signed 32-bit integer (big endian) and advance
*/
int32(): number {
if (this.currentPosition + UINT32_SIZE > this._totalLength) {
throw new Error('Buffer overflow: attempted to read beyond buffer end')
}
const value = this.readUInt32BEAt(this.currentPosition)
this.currentPosition += UINT32_SIZE
// Convert unsigned to signed
return value > 0x7FFFFFFF ? value - 0x100000000 : value
}
/**
* Read an unsigned 32-bit integer (big endian) and advance
*/
uint32(): number {
if (this.currentPosition + UINT32_SIZE > this._totalLength) {
throw new Error('Buffer overflow: attempted to read beyond buffer end')
}
const value = this.readUInt32BEAt(this.currentPosition)
this.currentPosition += UINT32_SIZE
return value
}
/**
* Read a string of specified length and advance
*/
string(length: number): string {
const bytes = this.bytes(length)
return bytes.toString(this.encoding)
}
/**
* Read a null-terminated string and advance
*/
cstring(): string {
const start = this.currentPosition
let end = start
// Find the null terminator
while (end < this._totalLength && this.readByteAt(end) !== 0) {
end++
}
// Read the string content (without null terminator)
const length = end - start
const result = length > 0 ? this.string(length) : ''
// Skip the null terminator if we found one
if (end < this._totalLength) {
this.currentPosition++ // Skip null byte
}
return result
}
/**
* Read bytes at the current position and advance
*/
bytes(length: number): Buffer {
if (this.currentPosition + length > this._totalLength) {
throw new Error('Buffer overflow: attempted to read beyond buffer end')
}
const result = this.getBytesAt(this.currentPosition, length)
this.currentPosition += length
return result
}
/**
* Create a new VirtualBuffer from current position onwards
*/
slice(start: number = this.currentPosition, end?: number): VirtualBuffer {
const endPos = end ?? this._totalLength
const newChunks: BufferChunk[] = []
let virtualPos = 0
for (const chunk of this.chunks) {
const chunkEnd = virtualPos + chunk.length
if (virtualPos >= endPos) break
if (chunkEnd <= start) {
virtualPos = chunkEnd
continue
}
const chunkStart = Math.max(start - virtualPos, 0)
const chunkActualEnd = Math.min(endPos - virtualPos, chunk.length)
const newLength = chunkActualEnd - chunkStart
if (newLength > 0) {
newChunks.push({
buffer: chunk.buffer,
offset: chunk.offset + chunkStart,
length: newLength
})
}
virtualPos = chunkEnd
}
return new VirtualBuffer(newChunks)
}
/**
* Seek to a specific position
*/
seek(position: number): void {
if (position < 0 || position > this._totalLength) {
throw new Error(`Invalid seek position: ${position}`)
}
this.currentPosition = position
}
/**
* Remove consumed chunks to free memory
*/
compact(): void {
if (this.currentPosition === 0) return
let remainingOffset = this.currentPosition
let newChunks: BufferChunk[] = []
for (const chunk of this.chunks) {
if (remainingOffset >= chunk.length) {
remainingOffset -= chunk.length
continue
}
if (remainingOffset > 0) {
newChunks.push({
buffer: chunk.buffer,
offset: chunk.offset + remainingOffset,
length: chunk.length - remainingOffset
})
remainingOffset = 0
} else {
newChunks.push(chunk)
}
}
this.chunks = newChunks
this._totalLength -= this.currentPosition
this.currentPosition = 0
}
// Private helper methods
private findChunkAndOffset(virtualOffset: number): { chunkIndex: number; offsetInChunk: number } | null {
let currentOffset = 0
for (let i = 0; i < this.chunks.length; i++) {
const chunk = this.chunks[i]
if (virtualOffset >= currentOffset && virtualOffset < currentOffset + chunk.length) {
return {
chunkIndex: i,
offsetInChunk: virtualOffset - currentOffset
}
}
currentOffset += chunk.length
}
return null
}
private readByteAt(virtualOffset: number): number {
const location = this.findChunkAndOffset(virtualOffset)
if (!location) {
throw new Error(`Invalid offset: ${virtualOffset}`)
}
const chunk = this.chunks[location.chunkIndex]
return chunk.buffer[chunk.offset + location.offsetInChunk]
}
private readUInt32BEAt(virtualOffset: number): number {
const location = this.findChunkAndOffset(virtualOffset)
if (!location) {
throw new Error(`Invalid offset: ${virtualOffset}`)
}
const chunk = this.chunks[location.chunkIndex]
// If the entire UInt32 fits in this chunk
if (location.offsetInChunk + UINT32_SIZE <= chunk.length) {
return chunk.buffer.readUInt32BE(chunk.offset + location.offsetInChunk)
}
// UInt32 spans multiple chunks - read byte by byte
const tempBuffer = Buffer.allocUnsafe(UINT32_SIZE)
for (let i = 0; i < UINT32_SIZE; i++) {
tempBuffer[i] = this.readByteAt(virtualOffset + i)
}
return tempBuffer.readUInt32BE(0)
}
private readUInt16BEAt(virtualOffset: number): number {
const location = this.findChunkAndOffset(virtualOffset)
if (!location) {
throw new Error(`Invalid offset: ${virtualOffset}`)
}
const chunk = this.chunks[location.chunkIndex]
// If the entire UInt16 fits in this chunk
if (location.offsetInChunk + 2 <= chunk.length) {
return chunk.buffer.readUInt16BE(chunk.offset + location.offsetInChunk)
}
// UInt16 spans chunks - read byte by byte
const tempBuffer = Buffer.allocUnsafe(2)
for (let i = 0; i < 2; i++) {
tempBuffer[i] = this.readByteAt(virtualOffset + i)
}
return tempBuffer.readUInt16BE(0)
}
private getBytesAt(virtualOffset: number, length: number): Buffer {
if (length === 0) return emptyBuffer
const location = this.findChunkAndOffset(virtualOffset)
if (!location) {
throw new Error(`Invalid offset: ${virtualOffset}`)
}
const startChunk = this.chunks[location.chunkIndex]
// If all bytes fit in a single chunk, return a slice
if (location.offsetInChunk + length <= startChunk.length) {
return startChunk.buffer.subarray(
startChunk.offset + location.offsetInChunk,
startChunk.offset + location.offsetInChunk + length
)
}
// Bytes span multiple chunks - copy into new buffer
const result = Buffer.allocUnsafe(length)
let resultOffset = 0
let remainingLength = length
let currentVirtualOffset = virtualOffset
while (remainingLength > 0) {
const loc = this.findChunkAndOffset(currentVirtualOffset)
if (!loc) break
const chunk = this.chunks[loc.chunkIndex]
const availableInChunk = chunk.length - loc.offsetInChunk
const toCopy = Math.min(remainingLength, availableInChunk)
chunk.buffer.copy(
result,
resultOffset,
chunk.offset + loc.offsetInChunk,
chunk.offset + loc.offsetInChunk + toCopy
)
resultOffset += toCopy
remainingLength -= toCopy
currentVirtualOffset += toCopy
}
return result
}
}
export class Parser {
private virtualBuffer = new VirtualBuffer()
private mode: Mode
constructor(opts?: StreamOptions) {
if (opts?.mode === 'binary') {
throw new Error('Binary mode not supported yet')
}
this.mode = opts?.mode || 'text'
}
public parse(buffer: Buffer, callback: MessageCallback): void {
// Add new buffer to virtual buffer
this.virtualBuffer.addChunk(buffer)
// Parse all complete messages
while (this.virtualBuffer.canRead(HEADER_LENGTH)) {
const startPosition = this.virtualBuffer.position
// Read message header
const code = this.virtualBuffer.byte()
const length = this.virtualBuffer.uint32()
const payloadLength = length - LEN_LENGTH
// Check if we have the complete message
if (!this.virtualBuffer.canRead(payloadLength)) {
// Incomplete message, rewind and wait for more data
this.virtualBuffer.seek(startPosition)
break
}
// Create a slice for just this message's payload
const messageBuffer = this.virtualBuffer.slice(
this.virtualBuffer.position,
this.virtualBuffer.position + payloadLength
)
// Advance past this message
this.virtualBuffer.seek(this.virtualBuffer.position + payloadLength)
// Process the message using the virtual buffer directly
const message = this.handlePacketWithVirtualBuffer(code, length, messageBuffer)
callback(message)
}
// Compact the virtual buffer to free consumed chunks
this.virtualBuffer.compact()
}
private handlePacketWithVirtualBuffer(code: number, length: number, messageBuffer: VirtualBuffer): BackendMessage {
switch (code) {
case MessageCodes.BindComplete:
return bindComplete
case MessageCodes.ParseComplete:
return parseComplete
case MessageCodes.CloseComplete:
return closeComplete
case MessageCodes.NoData:
return noData
case MessageCodes.PortalSuspended:
return portalSuspended
case MessageCodes.CopyDone:
return copyDone
case MessageCodes.ReplicationStart:
return replicationStart
case MessageCodes.EmptyQuery:
return emptyQuery
case MessageCodes.DataRow:
return this.parseDataRowMessage(length, messageBuffer)
case MessageCodes.CommandComplete:
return this.parseCommandCompleteMessage(length, messageBuffer)
case MessageCodes.ReadyForQuery:
return this.parseReadyForQueryMessage(length, messageBuffer)
case MessageCodes.NotificationResponse:
return this.parseNotificationMessage(length, messageBuffer)
case MessageCodes.AuthenticationResponse:
return this.parseAuthenticationResponse(length, messageBuffer)
case MessageCodes.ParameterStatus:
return this.parseParameterStatusMessage(length, messageBuffer)
case MessageCodes.BackendKeyData:
return this.parseBackendKeyData(length, messageBuffer)
case MessageCodes.ErrorMessage:
return this.parseErrorMessage(length, 'error', messageBuffer)
case MessageCodes.NoticeMessage:
return this.parseErrorMessage(length, 'notice', messageBuffer)
case MessageCodes.RowDescriptionMessage:
return this.parseRowDescriptionMessage(length, messageBuffer)
case MessageCodes.ParameterDescriptionMessage:
return this.parseParameterDescriptionMessage(length, messageBuffer)
case MessageCodes.CopyIn:
return this.parseCopyInMessage(length, messageBuffer)
case MessageCodes.CopyOut:
return this.parseCopyOutMessage(length, messageBuffer)
case MessageCodes.CopyData:
return this.parseCopyDataMessage(length, messageBuffer)
default:
return new DatabaseError('received invalid response: ' + code.toString(16), length, 'error')
}
}
// Updated parsing methods that use VirtualBuffer directly
private parseReadyForQueryMessage(length: number, reader: VirtualBuffer): ReadyForQueryMessage {
const status = reader.string(1)
return new ReadyForQueryMessage(length, status)
}
private parseCommandCompleteMessage(length: number, reader: VirtualBuffer): CommandCompleteMessage {
const text = reader.cstring()
return new CommandCompleteMessage(length, text)
}
private parseCopyDataMessage(length: number, reader: VirtualBuffer): CopyDataMessage {
const chunk = reader.bytes(length - 4)
return new CopyDataMessage(length, chunk)
}
private parseCopyInMessage(length: number, reader: VirtualBuffer): CopyResponse {
return this.parseCopyMessage(length, 'copyInResponse', reader)
}
private parseCopyOutMessage(length: number, reader: VirtualBuffer): CopyResponse {
return this.parseCopyMessage(length, 'copyOutResponse', reader)
}
private parseCopyMessage(length: number, messageName: MessageName, reader: VirtualBuffer): CopyResponse {
const isBinary = reader.byte() !== 0
const columnCount = reader.int16()
const message = new CopyResponse(length, messageName, isBinary, columnCount)
for (let i = 0; i < columnCount; i++) {
message.columnTypes[i] = reader.int16()
}
return message
}
private parseNotificationMessage(length: number, reader: VirtualBuffer): NotificationResponseMessage {
const processId = reader.int32()
const channel = reader.cstring()
const payload = reader.cstring()
return new NotificationResponseMessage(length, processId, channel, payload)
}
private parseRowDescriptionMessage(length: number, reader: VirtualBuffer): RowDescriptionMessage {
const fieldCount = reader.int16()
const message = new RowDescriptionMessage(length, fieldCount)
for (let i = 0; i < fieldCount; i++) {
message.fields[i] = this.parseField(reader)
}
return message
}
private parseField(reader: VirtualBuffer): Field {
const name = reader.cstring()
const tableID = reader.uint32()
const columnID = reader.int16()
const dataTypeID = reader.uint32()
const dataTypeSize = reader.int16()
const dataTypeModifier = reader.int32()
const mode = reader.int16() === 0 ? 'text' : 'binary'
return new Field(name, tableID, columnID, dataTypeID, dataTypeSize, dataTypeModifier, mode)
}
private parseParameterDescriptionMessage(length: number, reader: VirtualBuffer): ParameterDescriptionMessage {
const parameterCount = reader.int16()
const message = new ParameterDescriptionMessage(length, parameterCount)
for (let i = 0; i < parameterCount; i++) {
message.dataTypeIDs[i] = reader.int32()
}
return message
}
private parseDataRowMessage(length: number, reader: VirtualBuffer): DataRowMessage {
const fieldCount = reader.int16()
const fields: any[] = new Array(fieldCount)
for (let i = 0; i < fieldCount; i++) {
const len = reader.int32()
// a -1 for length means the value of the field is null
fields[i] = len === -1 ? null : reader.string(len)
}
return new DataRowMessage(length, fields)
}
private parseParameterStatusMessage(length: number, reader: VirtualBuffer): ParameterStatusMessage {
const name = reader.cstring()
const value = reader.cstring()
return new ParameterStatusMessage(length, name, value)
}
private parseBackendKeyData(length: number, reader: VirtualBuffer): BackendKeyDataMessage {
const processID = reader.int32()
const secretKey = reader.int32()
return new BackendKeyDataMessage(length, processID, secretKey)
}
private parseAuthenticationResponse(length: number, reader: VirtualBuffer): BackendMessage {
const code = reader.int32()
const message: BackendMessage & any = {
name: 'authenticationOk',
length,
}
switch (code) {
case 0: // AuthenticationOk
break
case 3: // AuthenticationCleartextPassword
if (message.length === 8) {
message.name = 'authenticationCleartextPassword'
}
break
case 5: // AuthenticationMD5Password
if (message.length === 12) {
message.name = 'authenticationMD5Password'
const salt = reader.bytes(4)
return new AuthenticationMD5Password(length, salt)
}
break
case 10: // AuthenticationSASL
{
message.name = 'authenticationSASL'
message.mechanisms = []
let mechanism: string
do {
mechanism = reader.cstring()
if (mechanism) {
message.mechanisms.push(mechanism)
}
} while (mechanism)
}
break
case 11: // AuthenticationSASLContinue
message.name = 'authenticationSASLContinue'
message.data = reader.string(length - 8)
break
case 12: // AuthenticationSASLFinal
message.name = 'authenticationSASLFinal'
message.data = reader.string(length - 8)
break
default:
throw new Error('Unknown authenticationOk message type ' + code)
}
return message
}
private parseErrorMessage(length: number, name: MessageName, reader: VirtualBuffer): BackendMessage {
const fields: Record<string, string> = {}
let fieldType = reader.string(1)
while (fieldType !== '\0') {
fields[fieldType] = reader.cstring()
fieldType = reader.string(1)
}
const messageValue = fields.M
const message =
name === 'notice' ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name)
message.severity = fields.S
message.code = fields.C
message.detail = fields.D
message.hint = fields.H
message.position = fields.P
message.internalPosition = fields.p
message.internalQuery = fields.q
message.where = fields.W
message.schema = fields.s
message.table = fields.t
message.column = fields.c
message.dataType = fields.d
message.constraint = fields.n
message.file = fields.F
message.line = fields.L
message.routine = fields.R
return message
}
} |
I do like where this virtual buffer approach is headed. I think that's probably somewhat ideal - both good on space and doesn't allocate or copy any buffers. I think I wanted to take a stab at that long ago and didn't for whatever reason (probably laziness 😛 ) |
We also need to bnechmark the cpu utilization between all approaches (and compare them to the original code before my PR) |
@regevbr , I've run benchmarks again and checked results - looks like the first version is better. |
Thanks. Exciting, I wonder how that can even be... @brianc thoughts? The fact that my first version is slightly worse than @wpietrucha is also mind-bugging |
Hey I can't wait to take a look at this - sorry I'm absolutely slammed at work right now so might have to be later tonight or this weekend 😬 I also have trust in @hjr3 and @charmander to make the right call. 😄 |
I ran the benchmarks locally and it does appear that the "shrink buffer when < 50% utilization" approach is the fastest. I am not sure I am seeing a 5% improvement, but benchmarks on a laptop are always a bit fuzzy. I have to spend more time on this, but thank you for the reproduction steps! |
I agree that this is a bug, but it’s surprising (in order for the root cause to be exactly as described) that you’d never get a packet that ends on a message boundary into the parser at any point (especially in a benchmark, at the end of an iteration). What happens if you trigger garbage collection manually in the benchmark? |
The bug I’ve opened #3533 to address might be adding confusion. |
Great lets rerun the bench marks after this is merged and see if thye make more sense |
Fix: Resolve Memory Leak in Parser Buffer Management
Problem
The
Parser
class inpg-protocol
had a significant memory leak caused by improper buffer management. Large buffers would remain in memory indefinitely, even after their data was processed, leading to steadily increasing memory usage in long-running applications.Here is what we observed in our application:
compressed-memory-leak-small.mov
Root Cause
When the parser processes messages and has remaining partial data, it would only adjust buffer cursors (
this.bufferOffset
) instead of actually shrinking the buffer. This meant that if a very large message (e.g., 1MB) arrived, the internal buffer would grow to accommodate it but never shrink back, even if only a few bytes were still needed.Result: The parser could hold references to very large buffers indefinitely, preventing garbage collection.
Solution
When partial messages remain, create a new appropriately-sized buffer and copy only the remaining data:
Memory Management Flow
Before Fix:
After Fix:
Impact
Testing
The fix maintains full backward compatibility and all existing tests should pass. The memory management improvements will be most noticeable in applications that:
Files Changed:
packages/pg-protocol/src/parser.ts
- Fixed buffer shrinking logic to prevent memory leaks