MCPcopy
hub / github.com/segmentio/kafka-go / testConnWriteReadConcurrently

Function testConnWriteReadConcurrently

conn_test.go:942–983  ·  view source on GitHub ↗
(t *testing.T, conn *Conn)

Source from the content-addressed store, hash-verified

940}
941
942func testConnWriteReadConcurrently(t *testing.T, conn *Conn) {
943 const N = 1000
944 msgs := make([]string, N)
945 done := make(chan struct{})
946 written := make(chan struct{}, N/10)
947
948 for i := 0; i != N; i++ {
949 msgs[i] = strconv.Itoa(i)
950 }
951
952 go func() {
953 defer close(done)
954 for _, msg := range msgs {
955 if _, err := conn.Write([]byte(msg)); err != nil {
956 t.Error(err)
957 }
958 written <- struct{}{}
959 }
960 }()
961
962 b := make([]byte, 128)
963
964 for i := 0; i != N; i++ {
965 // wait until at least one message has been written. the reason for
966 // this synchronization is that we aren't using deadlines. as such, if
967 // the read happens before a message is available, it will cause a
968 // deadlock because the read request will never hit the one byte minimum
969 // in order to return and release the lock on the conn. by ensuring
970 // that there's at least one message produced, we don't hit that
971 // condition.
972 <-written
973 n, err := conn.Read(b)
974 if err != nil {
975 t.Error(err)
976 }
977 if s := string(b[:n]); s != strconv.Itoa(i) {
978 t.Errorf("bad message read at offset %d: %s", i, s)
979 }
980 }
981
982 <-done
983}
984
985func testConnReadShortBuffer(t *testing.T, conn *Conn) {
986 if _, err := conn.Write([]byte("Hello World!")); err != nil {

Callers

nothing calls this directly

Calls 3

WriteMethod · 0.45
ErrorMethod · 0.45
ReadMethod · 0.45

Tested by

no test coverage detected