diff --git a/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go b/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go index 0c1dce6f8..7641109c3 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go @@ -16,7 +16,6 @@ package routing import ( "net/http" - "sync" "github.com/gorilla/mux" "github.com/matrix-org/dendrite/common" @@ -42,7 +41,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg *config.MediaAPI })) activeRemoteRequests := &types.ActiveRemoteRequests{ - MXCToCond: map[string]*sync.Cond{}, + MXCToResult: map[string]*types.RemoteRequestResult{}, } r0mux.Handle("/download/{serverName}/{mediaId}", prometheus.InstrumentHandler("download", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { diff --git a/src/github.com/matrix-org/dendrite/mediaapi/types/types.go b/src/github.com/matrix-org/dendrite/mediaapi/types/types.go index ac18f5fe2..d54bcdf67 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/types/types.go @@ -18,6 +18,7 @@ import ( "sync" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) // FileSizeBytes is a file size in bytes @@ -59,10 +60,20 @@ type MediaMetadata struct { UserID MatrixUserID } +// RemoteRequestResult is used for broadcasting the result of a request for a remote file to routines waiting on the condition +type RemoteRequestResult struct { + // Condition used for the requester to signal the result to all other routines waiting on this condition + Cond *sync.Cond + // MediaMetadata of the requested file to avoid querying the database for every waiting routine + MediaMetadata *MediaMetadata + // An error in util.JSONResponse form. nil in case of no error. + ErrorResponse *util.JSONResponse +} + // ActiveRemoteRequests is a lockable map of media URIs requested from remote homeservers // It is used for ensuring multiple requests for the same file do not clobber each other. type ActiveRemoteRequests struct { sync.Mutex // The string key is an mxc:// URL - MXCToCond map[string]*sync.Cond + MXCToResult map[string]*RemoteRequestResult } diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index 9f66409c5..af31535b4 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -18,10 +18,13 @@ import ( "encoding/json" "fmt" "io" + "mime" "net/http" "os" + "path/filepath" "regexp" "strconv" + "sync" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/jsonerror" @@ -47,6 +50,10 @@ type downloadRequest struct { // Download implements /download // Files from this server (i.e. origin == cfg.ServerName) are served directly +// Files from remote servers (i.e. origin != cfg.ServerName) are cached locally. +// If they are present in the cache, they are served directly. +// If they are not present in the cache, they are obtained from the remote server and +// simultaneously served back to the client and written into the cache. func Download(w http.ResponseWriter, req *http.Request, origin gomatrixserverlib.ServerName, mediaID types.MediaID, cfg *config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) { r := &downloadRequest{ MediaMetadata: &types.MediaMetadata{ @@ -119,10 +126,8 @@ func (r *downloadRequest) doDownload(w http.ResponseWriter, cfg *config.MediaAPI mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) if err != nil { r.Logger.WithError(err).Error("Error querying the database.") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } if mediaMetadata == nil { if r.MediaMetadata.Origin == cfg.ServerName { @@ -132,14 +137,15 @@ func (r *downloadRequest) doDownload(w http.ResponseWriter, cfg *config.MediaAPI JSON: jsonerror.NotFound(fmt.Sprintf("File with media ID %q does not exist", r.MediaMetadata.MediaID)), } } - // TODO: If we do not have a record and the origin is remote, we need to fetch it and respond with that file - return &util.JSONResponse{ - Code: 404, - JSON: jsonerror.NotFound(fmt.Sprintf("File with media ID %q does not exist", r.MediaMetadata.MediaID)), + // If we do not have a record and the origin is remote, we need to fetch it and respond with that file + resErr := r.getRemoteFile(cfg, db, activeRemoteRequests) + if resErr != nil { + return resErr } + } else { + // If we have a record, we can respond from the local file + r.MediaMetadata = mediaMetadata } - // If we have a record, we can respond from the local file - r.MediaMetadata = mediaMetadata return r.respondFromLocalFile(w, cfg.AbsBasePath) } @@ -149,27 +155,21 @@ func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePat filePath, err := fileutils.GetPathFromBase64Hash(r.MediaMetadata.Base64Hash, absBasePath) if err != nil { r.Logger.WithError(err).Error("Failed to get file path from metadata") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } file, err := os.Open(filePath) defer file.Close() if err != nil { r.Logger.WithError(err).Error("Failed to open file") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } stat, err := file.Stat() if err != nil { r.Logger.WithError(err).Error("Failed to stat file") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } if r.MediaMetadata.FileSizeBytes > 0 && int64(r.MediaMetadata.FileSizeBytes) != stat.Size() { @@ -177,10 +177,8 @@ func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePat "fileSizeDatabase": r.MediaMetadata.FileSizeBytes, "fileSizeDisk": stat.Size(), }).Warn("File size in database and on-disk differ.") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } r.Logger.WithFields(log.Fields{ @@ -202,13 +200,253 @@ func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePat if bytesResponded, err := io.Copy(w, file); err != nil { r.Logger.WithError(err).Warn("Failed to copy from cache") if bytesResponded == 0 { - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.NotFound(fmt.Sprintf("Failed to respond with file with media ID %q", r.MediaMetadata.MediaID)), - } + resErr := jsonerror.InternalServerError() + return &resErr } // If we have written any data then we have already responded with 200 OK and all we can do is close the connection return nil } return nil } + +// getRemoteFile fetches the remote file and caches it locally +// A hash map of active remote requests to a struct containing a sync.Cond is used to only download remote files once, +// regardless of how many download requests are received. +// Note: The named errorResponse return variable is used in a deferred broadcast of the metadata and error response to waiting goroutines. +// Returns a util.JSONResponse error in case of error +func (r *downloadRequest) getRemoteFile(cfg *config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) (errorResponse *util.JSONResponse) { + // Note: getMediaMetadataFromActiveRequest uses mutexes and conditions from activeRemoteRequests + mediaMetadata, resErr := r.getMediaMetadataFromActiveRequest(activeRemoteRequests) + if resErr != nil { + return resErr + } else if mediaMetadata != nil { + // If we got metadata from an active request, we can respond from the local file + r.MediaMetadata = mediaMetadata + } else { + // Note: This is an active request that MUST broadcastMediaMetadata to wake up waiting goroutines! + // Note: broadcastMediaMetadata uses mutexes and conditions from activeRemoteRequests + defer func() { + // Note: errorResponse is the named return variable so we wrap this in a closure to re-evaluate the arguments at defer-time + if err := recover(); err != nil { + resErr := jsonerror.InternalServerError() + r.broadcastMediaMetadata(activeRemoteRequests, &resErr) + panic(err) + } + r.broadcastMediaMetadata(activeRemoteRequests, errorResponse) + }() + + // check if we have a record of the media in our database + mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) + if err != nil { + r.Logger.WithError(err).Error("Error querying the database.") + resErr := jsonerror.InternalServerError() + return &resErr + } + + if mediaMetadata == nil { + // If we do not have a record, we need to fetch the remote file first and then respond from the local file + resErr := r.fetchRemoteFileAndStoreMetadata(cfg.AbsBasePath, cfg.MaxFileSizeBytes, db) + if resErr != nil { + return resErr + } + } else { + // If we have a record, we can respond from the local file + r.MediaMetadata = mediaMetadata + } + } + return +} + +func (r *downloadRequest) getMediaMetadataFromActiveRequest(activeRemoteRequests *types.ActiveRemoteRequests) (*types.MediaMetadata, *util.JSONResponse) { + // Check if there is an active remote request for the file + mxcURL := "mxc://" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) + + activeRemoteRequests.Lock() + defer activeRemoteRequests.Unlock() + + if activeRemoteRequestResult, ok := activeRemoteRequests.MXCToResult[mxcURL]; ok { + r.Logger.Info("Waiting for another goroutine to fetch the remote file.") + + // NOTE: Wait unlocks and locks again internally. There is still a deferred Unlock() that will unlock this. + activeRemoteRequestResult.Cond.Wait() + if activeRemoteRequestResult.ErrorResponse != nil { + return nil, activeRemoteRequestResult.ErrorResponse + } + + if activeRemoteRequestResult.MediaMetadata == nil { + return nil, &util.JSONResponse{ + Code: 404, + JSON: jsonerror.NotFound("File not found."), + } + } + + return activeRemoteRequestResult.MediaMetadata, nil + } + + // No active remote request so create one + activeRemoteRequests.MXCToResult[mxcURL] = &types.RemoteRequestResult{ + Cond: &sync.Cond{L: activeRemoteRequests}, + } + + return nil, nil +} + +// broadcastMediaMetadata broadcasts the media metadata and error response to waiting goroutines +// Only the owner of the activeRemoteRequestResult for this origin and media ID should call this function. +func (r *downloadRequest) broadcastMediaMetadata(activeRemoteRequests *types.ActiveRemoteRequests, errorResponse *util.JSONResponse) { + activeRemoteRequests.Lock() + defer activeRemoteRequests.Unlock() + mxcURL := "mxc://" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) + if activeRemoteRequestResult, ok := activeRemoteRequests.MXCToResult[mxcURL]; ok { + r.Logger.Info("Signalling other goroutines waiting for this goroutine to fetch the file.") + activeRemoteRequestResult.MediaMetadata = r.MediaMetadata + activeRemoteRequestResult.ErrorResponse = errorResponse + activeRemoteRequestResult.Cond.Broadcast() + } + delete(activeRemoteRequests.MXCToResult, mxcURL) +} + +// fetchRemoteFileAndStoreMetadata fetches the file from the remote server and stores its metadata in the database +func (r *downloadRequest) fetchRemoteFileAndStoreMetadata(absBasePath types.Path, maxFileSizeBytes types.FileSizeBytes, db *storage.Database) *util.JSONResponse { + finalPath, duplicate, resErr := r.fetchRemoteFile(absBasePath, maxFileSizeBytes) + if resErr != nil { + return resErr + } + + r.Logger.WithFields(log.Fields{ + "Base64Hash": r.MediaMetadata.Base64Hash, + "UploadName": r.MediaMetadata.UploadName, + "FileSizeBytes": r.MediaMetadata.FileSizeBytes, + "Content-Type": r.MediaMetadata.ContentType, + }).Info("Storing file metadata to media repository database") + + // FIXME: timeout db request + if err := db.StoreMediaMetadata(r.MediaMetadata); err != nil { + // If the file is a duplicate (has the same hash as an existing file) then + // there is valid metadata in the database for that file. As such we only + // remove the file if it is not a duplicate. + if duplicate == false { + finalDir := filepath.Dir(string(finalPath)) + fileutils.RemoveDir(types.Path(finalDir), r.Logger) + } + // NOTE: It should really not be possible to fail the uniqueness test here so + // there is no need to handle that separately + resErr := jsonerror.InternalServerError() + return &resErr + } + + // TODO: generate thumbnails + + r.Logger.WithFields(log.Fields{ + "UploadName": r.MediaMetadata.UploadName, + "Base64Hash": r.MediaMetadata.Base64Hash, + "FileSizeBytes": r.MediaMetadata.FileSizeBytes, + "Content-Type": r.MediaMetadata.ContentType, + }).Infof("Remote file cached") + + return nil +} + +func (r *downloadRequest) fetchRemoteFile(absBasePath types.Path, maxFileSizeBytes types.FileSizeBytes) (types.Path, bool, *util.JSONResponse) { + r.Logger.Info("Fetching remote file") + + // create request for remote file + resp, resErr := r.createRemoteRequest() + if resErr != nil { + return "", false, resErr + } + defer resp.Body.Close() + + // get metadata from request and set metadata on response + contentLength, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) + if err != nil { + r.Logger.WithError(err).Warn("Failed to parse content length") + return "", false, &util.JSONResponse{ + Code: 502, + JSON: jsonerror.Unknown("Invalid response from remote server"), + } + } + if contentLength > int64(maxFileSizeBytes) { + return "", false, &util.JSONResponse{ + Code: 413, + JSON: jsonerror.Unknown(fmt.Sprintf("Remote file is too large (%v > %v bytes)", contentLength, maxFileSizeBytes)), + } + } + r.MediaMetadata.FileSizeBytes = types.FileSizeBytes(contentLength) + r.MediaMetadata.ContentType = types.ContentType(resp.Header.Get("Content-Type")) + _, params, err := mime.ParseMediaType(resp.Header.Get("Content-Disposition")) + if err == nil && params["filename"] != "" { + r.MediaMetadata.UploadName = types.Filename(params["filename"]) + } + + r.Logger.Info("Transferring remote file") + + // The file data is hashed but is NOT used as the MediaID, unlike in Upload. The hash is useful as a + // method of deduplicating files to save storage, as well as a way to conduct + // integrity checks on the file data in the repository. + // Data is truncated to maxFileSizeBytes. Content-Length was reported as 0 < Content-Length <= maxFileSizeBytes so this is OK. + hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(resp.Body, maxFileSizeBytes, absBasePath) + if err != nil { + r.Logger.WithError(err).WithFields(log.Fields{ + "MaxFileSizeBytes": maxFileSizeBytes, + }).Warn("Error while downloading file from remote server") + fileutils.RemoveDir(tmpDir, r.Logger) + return "", false, &util.JSONResponse{ + Code: 502, + JSON: jsonerror.Unknown("File could not be downloaded from remote server"), + } + } + + r.Logger.Info("Remote file transferred") + + // It's possible the bytesWritten to the temporary file is different to the reported Content-Length from the remote + // request's response. bytesWritten is therefore used as it is what would be sent to clients when reading from the local + // file. + r.MediaMetadata.FileSizeBytes = types.FileSizeBytes(bytesWritten) + r.MediaMetadata.Base64Hash = hash + + // The database is the source of truth so we need to have moved the file first + finalPath, duplicate, err := fileutils.MoveFileWithHashCheck(tmpDir, r.MediaMetadata, absBasePath, r.Logger) + if err != nil { + r.Logger.WithError(err).Error("Failed to move file.") + resErr := jsonerror.InternalServerError() + return "", false, &resErr + } + if duplicate { + r.Logger.WithField("dst", finalPath).Info("File was stored previously - discarding duplicate") + // Continue on to store the metadata in the database + } + + return types.Path(finalPath), duplicate, nil +} + +func (r *downloadRequest) createRemoteRequest() (*http.Response, *util.JSONResponse) { + matrixClient := gomatrixserverlib.NewClient() + + resp, err := matrixClient.CreateMediaDownloadRequest(r.MediaMetadata.Origin, string(r.MediaMetadata.MediaID)) + if err != nil { + r.Logger.WithError(err).Error("Failed to create download request") + return nil, &util.JSONResponse{ + Code: 502, + JSON: jsonerror.Unknown(fmt.Sprintf("File with media ID %q could not be downloaded from %q", r.MediaMetadata.MediaID, r.MediaMetadata.Origin)), + } + } + + if resp.StatusCode != 200 { + if resp.StatusCode == 404 { + return nil, &util.JSONResponse{ + Code: 404, + JSON: jsonerror.NotFound(fmt.Sprintf("File with media ID %q does not exist", r.MediaMetadata.MediaID)), + } + } + r.Logger.WithFields(log.Fields{ + "StatusCode": resp.StatusCode, + }).Warn("Received error response") + return nil, &util.JSONResponse{ + Code: 502, + JSON: jsonerror.Unknown(fmt.Sprintf("File with media ID %q could not be downloaded from %q", r.MediaMetadata.MediaID, r.MediaMetadata.Origin)), + } + } + + return resp, nil +} diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go index dabd50073..f1838a559 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go @@ -136,10 +136,8 @@ func (r *uploadRequest) doUpload(reqReader io.Reader, cfg *config.MediaAPI, db * mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) if err != nil { r.Logger.WithError(err).Error("Error querying the database.") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } if mediaMetadata != nil { diff --git a/vendor/manifest b/vendor/manifest index e76c66ccf..b5725ca36 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -98,7 +98,7 @@ { "importpath": "github.com/matrix-org/gomatrixserverlib", "repository": "https://github.com/matrix-org/gomatrixserverlib", - "revision": "c396ef3cc1e546729f7052f1f48e345cc59269f4", + "revision": "b1dfcb3b345cc8410f1a03fec0a1ffe6bd002dcd", "branch": "master" }, { diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go index 40c9ebe42..7d322f93e 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go @@ -220,3 +220,14 @@ func (fc *Client) LookupServerKeys( } return result, nil } + +// CreateMediaDownloadRequest creates a request for media on a homeserver and returns the http.Response or an error +func (fc *Client) CreateMediaDownloadRequest(matrixServer ServerName, mediaID string) (*http.Response, error) { + requestURL := "matrix://" + string(matrixServer) + "/_matrix/media/v1/download/" + string(matrixServer) + "/" + mediaID + resp, err := fc.client.Get(requestURL) + if err != nil { + return nil, err + } + + return resp, nil +}