(
pattern: any,
data: { messages: TInput[] },
)
| 260 | } |
| 261 | |
| 262 | public emitBatch<TResult = any, TInput = any>( |
| 263 | pattern: any, |
| 264 | data: { messages: TInput[] }, |
| 265 | ): Observable<TResult> { |
| 266 | if (isNil(pattern) || isNil(data)) { |
| 267 | return _throw(() => new InvalidMessageException()); |
| 268 | } |
| 269 | const source = defer(async () => this.connect()).pipe( |
| 270 | mergeMap(() => this.dispatchBatchEvent({ pattern, data })), |
| 271 | ); |
| 272 | const connectableSource = connectable(source, { |
| 273 | connector: () => new Subject(), |
| 274 | resetOnDisconnect: false, |
| 275 | }); |
| 276 | connectableSource.connect(); |
| 277 | return connectableSource; |
| 278 | } |
| 279 | |
| 280 | public commitOffsets( |
| 281 | topicPartitions: TopicPartitionOffsetAndMetadata[], |
no test coverage detected