(t *testing.T)
| 1967 | } |
| 1968 | |
| 1969 | func TestJetStreamTransform(t *testing.T) { |
| 1970 | s := RunBasicJetStreamServer() |
| 1971 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1972 | nc, js := jsClient(t, s) |
| 1973 | defer nc.Close() |
| 1974 | |
| 1975 | ctx := context.Background() |
| 1976 | _, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 1977 | Name: "ORIGIN", |
| 1978 | Subjects: []string{"test"}, |
| 1979 | SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, |
| 1980 | Storage: jetstream.MemoryStorage, |
| 1981 | }) |
| 1982 | if err != nil { |
| 1983 | t.Fatalf("Unexpected error: %v", err) |
| 1984 | } |
| 1985 | err = nc.Publish("test", []byte("1")) |
| 1986 | if err != nil { |
| 1987 | t.Fatalf("Unexpected error: %v", err) |
| 1988 | } |
| 1989 | sourcingStream, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 1990 | Subjects: []string{}, |
| 1991 | Name: "SOURCING", |
| 1992 | Sources: []*jetstream.StreamSource{{Name: "ORIGIN", SubjectTransforms: []jetstream.SubjectTransformConfig{{Source: ">", Destination: "fromtest.>"}}}}, |
| 1993 | Storage: jetstream.MemoryStorage, |
| 1994 | }) |
| 1995 | if err != nil { |
| 1996 | t.Fatalf("Unexpected error: %v", err) |
| 1997 | } |
| 1998 | |
| 1999 | cons, err := sourcingStream.CreateConsumer(ctx, jetstream.ConsumerConfig{FilterSubject: "fromtest.>", MemoryStorage: true}) |
| 2000 | if err != nil { |
| 2001 | t.Fatalf("Unexpected error: %v", err) |
| 2002 | } |
| 2003 | m, err := cons.Next() |
| 2004 | if err != nil { |
| 2005 | t.Fatalf("Unexpected error: %v", err) |
| 2006 | } |
| 2007 | if m.Subject() != "fromtest.transformed.test" { |
| 2008 | t.Fatalf("the subject of the message doesn't match the expected fromtest.transformed.test: %s", m.Subject()) |
| 2009 | } |
| 2010 | } |
| 2011 | |
| 2012 | func TestStreamConfigMatches(t *testing.T) { |
| 2013 | srv := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected