MCPcopy
hub / github.com/nestjs/nest / publish

Method publish

packages/microservices/client/client-kafka.ts:389–431  ·  view source on GitHub ↗
(
    partialPacket: ReadPacket,
    callback: (packet: WritePacket) => any,
  )

Source from the content-addressed store, hash-verified

387 }
388
389 protected publish(
390 partialPacket: ReadPacket,
391 callback: (packet: WritePacket) => any,
392 ): () => void {
393 const packet = this.assignPacketId(partialPacket);
394 this.routingMap.set(packet.id, callback);
395
396 const cleanup = () => this.routingMap.delete(packet.id);
397 const errorCallback = (err: unknown) => {
398 cleanup();
399 callback({ err });
400 };
401
402 try {
403 const pattern = this.normalizePattern(partialPacket.pattern);
404 const replyTopic = this.getResponsePatternName(pattern);
405 const replyPartition = this.getReplyTopicPartition(replyTopic);
406
407 Promise.resolve(this.serializer.serialize(packet.data, { pattern }))
408 .then((serializedPacket: KafkaRequest) => {
409 serializedPacket.headers[KafkaHeaders.CORRELATION_ID] = packet.id;
410 serializedPacket.headers[KafkaHeaders.REPLY_TOPIC] = replyTopic;
411 serializedPacket.headers[KafkaHeaders.REPLY_PARTITION] =
412 replyPartition;
413
414 const message = Object.assign(
415 {
416 topic: pattern,
417 messages: [serializedPacket],
418 },
419 this.options.send || {},
420 );
421
422 return this._producer!.send(message);
423 })
424 .catch(err => errorCallback(err));
425
426 return cleanup;
427 } catch (err) {
428 errorCallback(err);
429 return () => null;
430 }
431 }
432
433 protected getResponsePatternName(pattern: string): string {
434 return `${pattern}.reply`;

Callers

nothing calls this directly

Calls 8

assignMethod · 0.80
setMethod · 0.65
catchMethod · 0.65
resolveMethod · 0.65
serializeMethod · 0.65
sendMethod · 0.45

Tested by

no test coverage detected