(w http.ResponseWriter, r *http.Request, opts *ClientInitOpts)
| 1154 | const InstrumentationLibrary = "dagger.io/engine.server" |
| 1155 | |
| 1156 | func (srv *Server) serveHTTPToClient(w http.ResponseWriter, r *http.Request, opts *ClientInitOpts) (rerr error) { |
| 1157 | if srv.isShuttingDown() { |
| 1158 | switch r.URL.Path { |
| 1159 | case engine.QueryEndpoint: |
| 1160 | return gqlErr(errServerShuttingDown, http.StatusServiceUnavailable) |
| 1161 | default: |
| 1162 | return httpErr(errServerShuttingDown, http.StatusServiceUnavailable) |
| 1163 | } |
| 1164 | } |
| 1165 | |
| 1166 | ctx := srv.withShutdownCancel(r.Context()) |
| 1167 | |
| 1168 | ctx, cancel := context.WithCancelCause(ctx) |
| 1169 | defer cancel(fmt.Errorf("http request done for client %q", opts.ClientID)) |
| 1170 | |
| 1171 | clientMetadata := opts.ClientMetadata |
| 1172 | ctx = engine.ContextWithClientMetadata(ctx, clientMetadata) |
| 1173 | |
| 1174 | // propagate span context and baggage from the client |
| 1175 | ctx = telemetry.Propagator.Extract(ctx, propagation.HeaderCarrier(r.Header)) |
| 1176 | |
| 1177 | // Check if repeated telemetry is requested via baggage |
| 1178 | baggage := baggage.FromContext(ctx) |
| 1179 | if member := baggage.Member("repeat-telemetry"); member.Value() != "" { |
| 1180 | ctx = dagql.WithRepeatedTelemetry(ctx) |
| 1181 | } |
| 1182 | |
| 1183 | ctx = bklog.WithLogger(ctx, bklog.G(ctx). |
| 1184 | WithField("trace", trace.SpanContextFromContext(ctx).TraceID().String()). |
| 1185 | WithField("span", trace.SpanContextFromContext(ctx).SpanID().String()). |
| 1186 | WithField("client_id", clientMetadata.ClientID). |
| 1187 | WithField("client_hostname", clientMetadata.ClientHostname). |
| 1188 | WithField("session_id", clientMetadata.SessionID)) |
| 1189 | ctx = slog.WithLogger(ctx, slog.FromContext(ctx).With( |
| 1190 | "client_id", clientMetadata.ClientID, |
| 1191 | "client_hostname", clientMetadata.ClientHostname, |
| 1192 | "session_id", clientMetadata.SessionID, |
| 1193 | "trace", trace.SpanContextFromContext(ctx).TraceID().String(), |
| 1194 | "span", trace.SpanContextFromContext(ctx).SpanID().String(), |
| 1195 | )) |
| 1196 | ctx = dagql.ContextWithCache(ctx, srv.engineCache) |
| 1197 | |
| 1198 | // Debug https://github.com/dagger/dagger/issues/7592 by logging method and some headers, which |
| 1199 | // are checked by gqlgen's handler |
| 1200 | bklog.G(ctx).WithFields(logrus.Fields{ |
| 1201 | "path": r.URL.Path, |
| 1202 | "method": r.Method, |
| 1203 | "upgradeHeader": r.Header.Get("Upgrade"), |
| 1204 | "contentType": r.Header.Get("Content-Type"), |
| 1205 | "trace": trace.SpanContextFromContext(ctx).TraceID().String(), |
| 1206 | "span": trace.SpanContextFromContext(ctx).SpanID().String(), |
| 1207 | }).Debug("handling http request") |
| 1208 | |
| 1209 | mux := http.NewServeMux() |
| 1210 | switch r.URL.Path { |
| 1211 | case "/v1/traces", "/v1/logs", "/v1/metrics": |
| 1212 | // Just get the client if it exists, don't init it. |
| 1213 | client, err := srv.clientFromIDs(clientMetadata.SessionID, clientMetadata.ClientID) |
nothing calls this directly
no test coverage detected