(t *testing.T)
| 115 | } |
| 116 | |
| 117 | func TestSimpleBrokerCommunication(t *testing.T) { |
| 118 | for _, tt := range brokerTestTable { |
| 119 | t.Run(tt.name, func(t *testing.T) { |
| 120 | Logger.Printf("Testing broker communication for %s", tt.name) |
| 121 | mb := NewMockBroker(t, 0) |
| 122 | mb.Returns(&mockEncoder{tt.response}) |
| 123 | pendingNotify := make(chan brokerMetrics) |
| 124 | // Register a callback to be notified about successful requests |
| 125 | mb.SetNotifier(func(bytesRead, bytesWritten int) { |
| 126 | pendingNotify <- brokerMetrics{bytesRead, bytesWritten} |
| 127 | }) |
| 128 | broker := NewBroker(mb.Addr()) |
| 129 | // Set the broker id in order to validate local broker metrics |
| 130 | broker.id = 0 |
| 131 | conf := NewTestConfig() |
| 132 | conf.ApiVersionsRequest = false |
| 133 | conf.Version = tt.version |
| 134 | err := broker.Open(conf) |
| 135 | if err != nil { |
| 136 | t.Fatal(err) |
| 137 | } |
| 138 | if _, err := broker.Connected(); err != nil { |
| 139 | t.Error(err) |
| 140 | } |
| 141 | tt.runner(t, broker) |
| 142 | // Wait up to 500 ms for the remote broker to process the request and |
| 143 | // notify us about the metrics |
| 144 | timeout := 500 * time.Millisecond |
| 145 | select { |
| 146 | case mockBrokerMetrics := <-pendingNotify: |
| 147 | validateBrokerMetrics(t, broker, mockBrokerMetrics) |
| 148 | case <-time.After(timeout): |
| 149 | t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout) |
| 150 | } |
| 151 | mb.Close() |
| 152 | err = broker.Close() |
| 153 | if err != nil { |
| 154 | t.Error(err) |
| 155 | } |
| 156 | }) |
| 157 | } |
| 158 | } |
| 159 | |
| 160 | func TestBrokerFailedRequest(t *testing.T) { |
| 161 | for _, tt := range brokerFailedReqTestTable { |
nothing calls this directly
no test coverage detected