MCPcopy
hub / github.com/cortexlabs/cortex / CreateWorkload

Method CreateWorkload

pkg/async-gateway/service.go:57–92  ·  view source on GitHub ↗

CreateWorkload enqueues an async workload request and uploads the request payload to S3

(id string, apiName string, queueURL string, payload io.Reader, headers http.Header)

Source from the content-addressed store, hash-verified

55
56// CreateWorkload enqueues an async workload request and uploads the request payload to S3
57func (s *service) CreateWorkload(id string, apiName string, queueURL string, payload io.Reader, headers http.Header) (string, error) {
58 prefix := async.StoragePath(s.clusterUID, apiName)
59 log := s.logger.With(zap.String("id", id), zap.String("apiName", apiName))
60
61 buf := &bytes.Buffer{}
62 if err := json.NewEncoder(buf).Encode(headers); err != nil {
63 return "", errors.Wrap(err, "failed to dump headers")
64 }
65
66 headersPath := async.HeadersPath(prefix, id)
67 log.Debugw("uploading headers", zap.String("path", headersPath))
68 if err := s.storage.Upload(headersPath, buf, "application/json"); err != nil {
69 return "", errors.Wrap(err, "failed to upload headers")
70 }
71
72 contentType := headers.Get("Content-Type")
73 payloadPath := async.PayloadPath(prefix, id)
74 log.Debugw("uploading payload", zap.String("path", payloadPath))
75 if err := s.storage.Upload(payloadPath, payload, contentType); err != nil {
76 return "", errors.Wrap(err, "failed to upload payload")
77 }
78
79 log.Debug("sending message to queue")
80 queue := NewSQS(queueURL, &s.session)
81 if err := queue.SendMessage(id, id); err != nil {
82 return "", errors.Wrap(err, "failed to send message to queue")
83 }
84
85 statusPath := fmt.Sprintf("%s/%s/status/%s", prefix, id, async.StatusInQueue)
86 log.Debug(fmt.Sprintf("setting status to %s", async.StatusInQueue))
87 if err := s.storage.Upload(statusPath, strings.NewReader(""), "text/plain"); err != nil {
88 return "", errors.Wrap(err, "failed to upload workload status")
89 }
90
91 return id, nil
92}
93
94// GetWorkload retrieves the status and result, if available, of a given workload
95func (s *service) GetWorkload(id string, apiName string) (GetWorkloadResponse, error) {

Callers

nothing calls this directly

Calls 8

StoragePathFunction · 0.92
WrapFunction · 0.92
HeadersPathFunction · 0.92
PayloadPathFunction · 0.92
NewSQSFunction · 0.85
UploadMethod · 0.65
SendMessageMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected