From 51e317787ed9c9207cf98ac8e9d73c342e9c4554 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 18 Aug 2025 22:53:35 +0200 Subject: [PATCH 1/3] 1.4.1-librdkafka-2.11.1 (#357) --- .semaphore/semaphore.yml | 2 +- lib/error.js | 2 +- lib/util.js | 2 +- package-lock.json | 6 +++--- package.json | 6 +++--- schemaregistry/package.json | 2 +- types/config.d.ts | 2 +- types/errors.d.ts | 2 +- 8 files changed, 12 insertions(+), 12 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 52a5bdcc..915f148c 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -28,7 +28,7 @@ global_job_config: - git submodule update --init --recursive - cd deps/librdkafka - git fetch origin - - git checkout v2.11.1-RC2 + - git checkout v2.11.1 - cd ../../ - cache clear diff --git a/lib/error.js b/lib/error.js index edf0cc88..d9d468c0 100644 --- a/lib/error.js +++ b/lib/error.js @@ -28,7 +28,7 @@ LibrdKafkaError.wrap = errorWrap; * @constant * @memberof RdKafka */ -// ====== Generated from librdkafka 2.11.1-RC2 file src-cpp/rdkafkacpp.h ====== +// ====== Generated from librdkafka 2.11.1 file src-cpp/rdkafkacpp.h ====== LibrdKafkaError.codes = { /* Internal errors to rdkafka: */ diff --git a/lib/util.js b/lib/util.js index c300a340..3890df5b 100644 --- a/lib/util.js +++ b/lib/util.js @@ -52,4 +52,4 @@ util.dictToStringList = function (mapOrObject) { return list; }; -util.bindingVersion = '1.4.1-rc2'; +util.bindingVersion = '1.4.1'; diff --git a/package-lock.json b/package-lock.json index cfd8ca85..a20c3bf4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@confluentinc/kafka-javascript", - "version": "1.4.1-rc2", + "version": "1.4.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@confluentinc/kafka-javascript", - "version": "1.4.1-rc2", + "version": "1.4.1", "hasInstallScript": true, "license": "MIT", "workspaces": [ @@ -11889,7 +11889,7 @@ }, "schemaregistry": { "name": "@confluentinc/schemaregistry", - "version": "1.4.1-rc2", + "version": "1.4.1", "license": "MIT", "dependencies": { "@aws-sdk/client-kms": "^3.637.0", diff --git a/package.json b/package.json index 0ac5b587..012f1c92 100644 --- a/package.json +++ b/package.json @@ -1,9 +1,9 @@ { "name": "@confluentinc/kafka-javascript", - "version": "1.4.1-rc2", + "version": "1.4.1", "description": "Node.js bindings for librdkafka", - "librdkafka": "2.11.1-RC2", - "librdkafka_win": "2.11.1-RC2", + "librdkafka": "2.11.1", + "librdkafka_win": "2.11.1", "main": "lib/index.js", "types": "types/index.d.ts", "scripts": { diff --git a/schemaregistry/package.json b/schemaregistry/package.json index 9bcf6096..0a49c87d 100644 --- a/schemaregistry/package.json +++ b/schemaregistry/package.json @@ -1,6 +1,6 @@ { "name": "@confluentinc/schemaregistry", - "version": "1.4.1-rc2", + "version": "1.4.1", "description": "Node.js client for Confluent Schema Registry", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/types/config.d.ts b/types/config.d.ts index 280791c6..0df9d404 100644 --- a/types/config.d.ts +++ b/types/config.d.ts @@ -1,4 +1,4 @@ -// ====== Generated from librdkafka 2.11.1-RC2 file CONFIGURATION.md ====== +// ====== Generated from librdkafka 2.11.1 file CONFIGURATION.md ====== // Code that generated this is a derivative work of the code from Nam Nguyen // https://gist.github.com/ntgn81/066c2c8ec5b4238f85d1e9168a04e3fb diff --git a/types/errors.d.ts b/types/errors.d.ts index af7a7dca..fba2f51a 100644 --- a/types/errors.d.ts +++ b/types/errors.d.ts @@ -1,4 +1,4 @@ -// ====== Generated from librdkafka 2.11.1-RC2 file src-cpp/rdkafkacpp.h ====== +// ====== Generated from librdkafka 2.11.1 file src-cpp/rdkafkacpp.h ====== export const CODES: { ERRORS: { /* Internal errors to rdkafka: */ /** Begin internal error codes (**-200**) */ From 43f3ed9c1ccc0103180bf53ce458062b49842ae4 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 19 Aug 2025 11:17:34 -0700 Subject: [PATCH 2/3] First cut (#352) --- .../rules/encryption/encrypt-executor.ts | 71 +++++++++++++++++-- schemaregistry/test/serde/avro.spec.ts | 63 ++++++++++++++++ 2 files changed, 130 insertions(+), 4 deletions(-) diff --git a/schemaregistry/rules/encryption/encrypt-executor.ts b/schemaregistry/rules/encryption/encrypt-executor.ts index 0e718705..aca488df 100644 --- a/schemaregistry/rules/encryption/encrypt-executor.ts +++ b/schemaregistry/rules/encryption/encrypt-executor.ts @@ -33,6 +33,8 @@ const ENCRYPT_KMS_TYPE = 'encrypt.kms.type' const ENCRYPT_DEK_ALGORITHM = 'encrypt.dek.algorithm' // EncryptDekExpiryDays represents dek expiry days const ENCRYPT_DEK_EXPIRY_DAYS = 'encrypt.dek.expiry.days' +// EncryptAlternateKmsKeyIds represents alternate kms key IDs +const ENCRYPT_ALTERNATE_KMS_KEY_IDS = 'encrypt.alternate.kms.key.ids' // MillisInDay represents number of milliseconds in a day const MILLIS_IN_DAY = 24 * 60 * 60 * 1000 @@ -387,7 +389,7 @@ export class EncryptionExecutorTransform { } let encryptedDek: Buffer | null = null if (!kek.shared) { - kmsClient = getKmsClient(this.executor.config!, kek) + kmsClient = new KmsClientWrapper(this.executor.config!, kek) // Generate new dek const rawDek = this.cryptor.generateKey() encryptedDek = await kmsClient.encrypt(rawDek) @@ -407,7 +409,7 @@ export class EncryptionExecutorTransform { const keyMaterialBytes = await this.executor.client!.getDekKeyMaterialBytes(dek) if (keyMaterialBytes == null) { if (kmsClient == null) { - kmsClient = getKmsClient(this.executor.config!, kek) + kmsClient = new KmsClientWrapper(this.executor.config!, kek) } const encryptedKeyMaterialBytes = await this.executor.client!.getDekEncryptedKeyMaterialBytes(dek) const rawDek = await kmsClient.decrypt(encryptedKeyMaterialBytes!) @@ -579,8 +581,8 @@ export class EncryptionExecutorTransform { } } -function getKmsClient(config: Map, kek: Kek): KmsClient { - let keyUrl = kek.kmsType + '://' + kek.kmsKeyId +function getKmsClient(config: Map, kmsType: string, kmsKeyId: string): KmsClient { + let keyUrl = kmsType + '://' + kmsKeyId let kmsClient = Registry.getKmsClient(keyUrl) if (kmsClient == null) { let kmsDriver = Registry.getKmsDriver(keyUrl) @@ -641,3 +643,64 @@ export class FieldEncryptionExecutorTransform implements FieldTransform { } } +export class KmsClientWrapper implements KmsClient { + private config: Map + private kek: Kek + private kekId: string + private kmsKeyIds: string[] + + constructor(config: Map, kek: Kek) { + this.config = config + this.kek = kek + this.kekId = kek.kmsType + '://' + kek.kmsKeyId + this.kmsKeyIds = this.getKmsKeyIds() + } + + getKmsKeyIds(): string[] { + let kmsKeyIds = [this.kek.kmsKeyId!] + let alternateKmsKeyIds: string | undefined + if (this.kek.kmsProps != null) { + alternateKmsKeyIds = this.kek.kmsProps[ENCRYPT_ALTERNATE_KMS_KEY_IDS] + } + if (alternateKmsKeyIds == null) { + alternateKmsKeyIds = this.config.get(ENCRYPT_ALTERNATE_KMS_KEY_IDS) + } + if (alternateKmsKeyIds != null) { + kmsKeyIds = kmsKeyIds.concat(alternateKmsKeyIds.split(',').map(id => id.trim())) + } + return kmsKeyIds + } + + supported(keyUri: string): boolean { + return this.kekId === keyUri + } + + async encrypt(rawKey: Buffer): Promise { + for (let i = 0; i < this.kmsKeyIds.length; i++) { + try { + let kmsClient = getKmsClient(this.config, this.kek.kmsType!, this.kmsKeyIds[i]) + return await kmsClient.encrypt(rawKey) + } catch (e) { + if (i === this.kmsKeyIds.length - 1) { + throw new RuleError(`failed to encrypt key with all KMS keys: ${e}`) + } + } + } + throw new RuleError('no KEK found for encryption') + } + + async decrypt(encryptedKey: Buffer): Promise { + for (let i = 0; i < this.kmsKeyIds.length; i++) { + try { + let kmsClient = getKmsClient(this.config, this.kek.kmsType!, this.kmsKeyIds[i]) + return await kmsClient.decrypt(encryptedKey) + } catch (e) { + if (i === this.kmsKeyIds.length - 1) { + throw new RuleError(`failed to decrypt key with all KMS keys: ${e}`) + } + } + } + throw new RuleError('no KEK found for decryption') + } +} + diff --git a/schemaregistry/test/serde/avro.spec.ts b/schemaregistry/test/serde/avro.spec.ts index a07d8e99..7580939e 100644 --- a/schemaregistry/test/serde/avro.spec.ts +++ b/schemaregistry/test/serde/avro.spec.ts @@ -1193,6 +1193,69 @@ describe('AvroSerializer', () => { expect(obj2.boolField).toEqual(obj.boolField); expect(obj2.bytesField).toEqual(obj.bytesField); }) + it('encryption with alternate keks', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + ruleConfig: { + secret: 'mysecret', + 'encrypt.alternate.kms.key.ids': 'mykey2,mykey3' + } + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + let dekClient = encryptionExecutor.client! + + let encRule: Rule = { + name: 'test-encrypt', + kind: 'TRANSFORM', + mode: RuleMode.WRITEREAD, + type: 'ENCRYPT_PAYLOAD', + params: { + 'encrypt.kek.name': 'kek1', + 'encrypt.kms.type': 'local-kms', + 'encrypt.kms.key.id': 'mykey', + }, + onFailure: 'ERROR,NONE' + } + let ruleSet: RuleSet = { + encodingRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + encryptionExecutor.client = dekClient + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) it('deterministic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL], From 317e1236089555bc3b721dd68b8d46e7c19b1b1f Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Fri, 22 Aug 2025 13:58:56 -0700 Subject: [PATCH 3/3] DGS-21988 Fix transformation of nullable JSON props (#359) * DGS-21988 Fix transformation of nullable JSON props * Minor cleanup * Minor cleanup --- schemaregistry/serde/json.ts | 30 +++++++ schemaregistry/test/serde/json.spec.ts | 119 +++++++++++++++++++++++++ 2 files changed, 149 insertions(+) diff --git a/schemaregistry/serde/json.ts b/schemaregistry/serde/json.ts index 93736f80..93a1c0d9 100644 --- a/schemaregistry/serde/json.ts +++ b/schemaregistry/serde/json.ts @@ -332,6 +332,17 @@ async function transform(ctx: RuleContext, schema: DereferencedJSONSchema, path: if (fieldCtx != null) { fieldCtx.type = getType(schema) } + if (schema.type != null && Array.isArray(schema.type) && schema.type.length > 0) { + let originalType = schema.type + let subschema = validateSubtypes(schema, msg) + try { + if (subschema != null) { + return await transform(ctx, subschema, path, msg, fieldTransform) + } + } finally { + schema.type = originalType + } + } if (schema.allOf != null && schema.allOf.length > 0) { let subschema = validateSubschemas(schema.allOf, msg) if (subschema != null) { @@ -406,6 +417,25 @@ async function transformField(ctx: RuleContext, path: string, propName: string, } } +function validateSubtypes(schema: DereferencedJSONSchema, msg: any): DereferencedJSONSchema | null { + if (typeof schema === 'boolean') { + return null + } + if (schema.type == null || !Array.isArray(schema.type) || schema.type.length === 0) { + return null + } + for (let typ of schema.type) { + schema.type = typ + try { + validateJSON(msg, schema) + return schema + } catch (error) { + // ignore + } + } + return null +} + function validateSubschemas(subschemas: DereferencedJSONSchema[], msg: any): DereferencedJSONSchema | null { for (let subschema of subschemas) { try { diff --git a/schemaregistry/test/serde/json.spec.ts b/schemaregistry/test/serde/json.spec.ts index 81623752..0af7fff0 100644 --- a/schemaregistry/test/serde/json.spec.ts +++ b/schemaregistry/test/serde/json.spec.ts @@ -25,9 +25,13 @@ import {RuleRegistry} from "@confluentinc/schemaregistry/serde/rule-registry"; import stringify from "json-stringify-deterministic"; import {JsonataExecutor} from "@confluentinc/schemaregistry/rules/jsonata/jsonata-executor"; import {clearKmsClients} from "@confluentinc/schemaregistry/rules/encryption/kms-registry"; +import {CelExecutor} from "../../rules/cel/cel-executor"; +import {CelFieldExecutor} from "../../rules/cel/cel-field-executor"; const encryptionExecutor = EncryptionExecutor.register() const fieldEncryptionExecutor = FieldEncryptionExecutor.register() +CelExecutor.register() +CelFieldExecutor.register() JsonataExecutor.register() LocalKmsDriver.register() @@ -73,6 +77,25 @@ const demoSchema = ` } } ` +const demoSchemaWithNullable = ` +{ + "type": "object", + "properties": { + "intField": { "type": "integer" }, + "doubleField": { "type": "number" }, + "stringField": { + "type": ["string", "null"], + "confluent:tags": [ "PII" ] + }, + "boolField": { "type": "boolean" }, + "bytesField": { + "type": "string", + "contentEncoding": "base64", + "confluent:tags": [ "PII" ] + } + } +} +` const demoSchemaWithUnion = ` { "type": "object", @@ -424,6 +447,102 @@ describe('JsonSerializer', () => { await expect(() => ser.serialize(topic, diffObj)).rejects.toThrow(SerializationError) }) + it('cel field transform', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: JsonSerializerConfig = { + useLatestVersion: true, + } + let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'TRANSFORM', + mode: RuleMode.WRITE, + type: 'CEL_FIELD', + expr: "name == 'stringField' ; value + '-suffix'" + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'JSON', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: JsonDeserializerConfig = {} + let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual('hi-suffix'); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) + it('cel field transform with nullable', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: JsonSerializerConfig = { + useLatestVersion: true, + } + let ser = new JsonSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'TRANSFORM', + mode: RuleMode.WRITE, + type: 'CEL_FIELD', + expr: "name == 'stringField' ; value + '-suffix'" + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'JSON', + schema: demoSchemaWithNullable, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: JsonDeserializerConfig = {} + let deser = new JsonDeserializer(client, SerdeType.VALUE, deserConfig) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual('hi-suffix'); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) it('basic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL],