mediaapi: Add database storage and upload handler

This commit is contained in:
Robert Swain 2017-05-26 16:49:54 +02:00
parent e21cd5ae66
commit 05e88d81cb
8 changed files with 561 additions and 7 deletions

View file

@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/mediaapi/config"
"github.com/matrix-org/dendrite/mediaapi/routing"
"github.com/matrix-org/dendrite/mediaapi/storage"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/gomatrixserverlib"
@ -69,6 +70,11 @@ func main() {
DataSource: dataSource,
}
db, err := storage.Open(cfg.DataSource)
if err != nil {
log.WithError(err).Panic("Failed to open database")
}
log.WithFields(log.Fields{
"BASE_PATH": absBasePath,
"BIND_ADDRESS": bindAddr,
@ -78,6 +84,6 @@ func main() {
"SERVER_NAME": serverName,
}).Info("Starting mediaapi")
routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg)
routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, db)
log.Fatal(http.ListenAndServe(bindAddr, nil))
}

View file

@ -0,0 +1,207 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutils
import (
"bufio"
"crypto/sha256"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/mediaapi/types"
)
// RemoveDir removes a directory and logs a warning in case of errors
func RemoveDir(dir types.Path, logger *log.Entry) {
dirErr := os.RemoveAll(string(dir))
if dirErr != nil {
logger.WithError(dirErr).WithField("dir", dir).Warn("Failed to remove directory")
}
}
// createTempDir creates a tmp/<random string> directory within baseDirectory and returns its path
func createTempDir(baseDirectory types.Path) (types.Path, error) {
baseTmpDir := path.Join(string(baseDirectory), "tmp")
if err := os.MkdirAll(baseTmpDir, 0770); err != nil {
return "", fmt.Errorf("Failed to create base temp dir: %v", err)
}
tmpDir, err := ioutil.TempDir(baseTmpDir, "")
if err != nil {
return "", fmt.Errorf("Failed to create temp dir: %v", err)
}
return types.Path(tmpDir), nil
}
// createFileWriter creates a buffered file writer with a new file at directory/filename
// Returns the file handle as it needs to be closed when writing is complete
func createFileWriter(directory types.Path, filename types.Filename) (*bufio.Writer, *os.File, error) {
filePath := path.Join(string(directory), string(filename))
file, err := os.Create(filePath)
if err != nil {
return nil, nil, fmt.Errorf("Failed to create file: %v", err)
}
return bufio.NewWriter(file), file, nil
}
func createTempFileWriter(absBasePath types.Path) (*bufio.Writer, *os.File, types.Path, error) {
tmpDir, err := createTempDir(absBasePath)
if err != nil {
return nil, nil, "", fmt.Errorf("Failed to create temp dir: %q", err)
}
writer, tmpFile, err := createFileWriter(tmpDir, "content")
if err != nil {
return nil, nil, "", fmt.Errorf("Failed to create file writer: %q", err)
}
return writer, tmpFile, tmpDir, nil
}
var (
// ErrFileIsTooLarge indicates that the uploaded file is larger than the configured maximum file size
ErrFileIsTooLarge = fmt.Errorf("file is too large")
errRead = fmt.Errorf("failed to read response from remote server")
errResponse = fmt.Errorf("failed to write file data to response body")
errHash = fmt.Errorf("failed to hash file data")
errWrite = fmt.Errorf("failed to write file to disk")
)
// WriteTempFile writes to a new temporary file
func WriteTempFile(reqReader io.Reader, maxFileSizeBytes types.FileSizeBytes, absBasePath types.Path) (types.Base64Hash, types.FileSizeBytes, types.Path, error) {
tmpFileWriter, tmpFile, tmpDir, err := createTempFileWriter(absBasePath)
if err != nil {
return "", -1, "", err
}
defer tmpFile.Close()
limitedReader := io.LimitReader(reqReader, int64(maxFileSizeBytes))
// Hash the file data. The hash will be returned. The hash is useful as a
// method of deduplicating files to save storage, as well as a way to conduct
// integrity checks on the file data in the repository.
hasher := sha256.New()
teeReader := io.TeeReader(limitedReader, hasher)
bytesWritten, err := io.Copy(tmpFileWriter, teeReader)
if err != nil && err != io.EOF {
return "", -1, "", err
}
tmpFileWriter.Flush()
hash := hasher.Sum(nil)
return types.Base64Hash(base64.URLEncoding.EncodeToString(hash[:])), types.FileSizeBytes(bytesWritten), tmpDir, nil
}
// GetPathFromBase64Hash evaluates the path to a media file from its Base64Hash
// If the Base64Hash is long enough, we split it into pieces, creating up to 2 subdirectories
// for more manageable browsing and use the remainder as the file name.
// For example, if Base64Hash is 'qwerty', the path will be 'q/w/erty'.
func GetPathFromBase64Hash(base64Hash types.Base64Hash, absBasePath types.Path) (string, error) {
var subPath, fileName string
hashLen := len(base64Hash)
switch {
case hashLen < 1:
return "", fmt.Errorf("Invalid filePath (Base64Hash too short): %q", base64Hash)
case hashLen > 255:
return "", fmt.Errorf("Invalid filePath (Base64Hash too long - max 255 characters): %q", base64Hash)
case hashLen < 2:
subPath = ""
fileName = string(base64Hash)
case hashLen < 3:
subPath = string(base64Hash[0:1])
fileName = string(base64Hash[1:])
default:
subPath = path.Join(
string(base64Hash[0:1]),
string(base64Hash[1:2]),
)
fileName = string(base64Hash[2:])
}
filePath, err := filepath.Abs(path.Join(
string(absBasePath),
subPath,
fileName,
))
if err != nil {
return "", fmt.Errorf("Unable to construct filePath: %q", err)
}
// check if the absolute absBasePath is a prefix of the absolute filePath
// if so, no directory escape has occurred and the filePath is valid
// Note: absBasePath is already absolute
if strings.HasPrefix(filePath, string(absBasePath)) == false {
return "", fmt.Errorf("Invalid filePath (not within absBasePath %v): %v", absBasePath, filePath)
}
return filePath, nil
}
// moveFile attempts to move the file src to dst
func moveFile(src types.Path, dst types.Path) error {
dstDir := path.Dir(string(dst))
err := os.MkdirAll(dstDir, 0770)
if err != nil {
return fmt.Errorf("Failed to make directory: %q", err)
}
err = os.Rename(string(src), string(dst))
if err != nil {
return fmt.Errorf("Failed to move directory: %q", err)
}
return nil
}
// MoveFileWithHashCheck checks for hash collisions when moving a temporary file to its destination based on metadata
// Check if destination file exists. As the destination is based on a hash of the file data,
// if it exists and the file size does not match then there is a hash collision for two different files. If
// it exists and the file size matches, it is believable that it is the same file and we can just
// discard the temporary file.
func MoveFileWithHashCheck(tmpDir types.Path, mediaMetadata *types.MediaMetadata, absBasePath types.Path, logger *log.Entry) (string, bool, error) {
duplicate := false
finalPath, err := GetPathFromBase64Hash(mediaMetadata.Base64Hash, absBasePath)
if err != nil {
RemoveDir(tmpDir, logger)
return "", duplicate, fmt.Errorf("failed to get file path from metadata: %q", err)
}
var stat os.FileInfo
if stat, err = os.Stat(finalPath); os.IsExist(err) {
duplicate = true
if stat.Size() == int64(mediaMetadata.FileSizeBytes) {
RemoveDir(tmpDir, logger)
return finalPath, duplicate, nil
}
// Remove the tmpDir as we anyway cannot cache the file on disk due to the hash collision
RemoveDir(tmpDir, logger)
return "", duplicate, fmt.Errorf("downloaded file with hash collision but different file size (%v)", finalPath)
}
err = moveFile(
types.Path(path.Join(string(tmpDir), "content")),
types.Path(finalPath),
)
if err != nil {
RemoveDir(tmpDir, logger)
return "", duplicate, fmt.Errorf("failed to move file to final destination (%v): %q", finalPath, err)
}
return finalPath, duplicate, nil
}

View file

@ -21,6 +21,7 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/mediaapi/config"
"github.com/matrix-org/dendrite/mediaapi/storage"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/dendrite/mediaapi/writers"
"github.com/matrix-org/gomatrixserverlib"
@ -32,11 +33,12 @@ const pathPrefixR0 = "/_matrix/media/v1"
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
// to clients which need to make outbound HTTP requests.
func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg *config.MediaAPI) {
func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg *config.MediaAPI, db *storage.Database) {
apiMux := mux.NewRouter()
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
// FIXME: /upload should use common.MakeAuthAPI()
r0mux.Handle("/upload", common.MakeAPI("upload", func(req *http.Request) util.JSONResponse {
return writers.Upload(req, cfg)
return writers.Upload(req, cfg, db)
}))
activeRemoteRequests := &types.ActiveRemoteRequests{

View file

@ -0,0 +1,112 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"database/sql"
"time"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
const mediaSchema = `
-- The media_repository table holds metadata for each media file stored and accessible to the local server,
-- the actual file is stored separately.
CREATE TABLE IF NOT EXISTS media_repository (
-- The id used to refer to the media.
-- For uploads to this server this is a base64-encoded sha256 hash of the file data
-- For media from remote servers, this can be any unique identifier string
media_id TEXT NOT NULL,
-- The origin of the media as requested by the client. Should be a homeserver domain.
media_origin TEXT NOT NULL,
-- The MIME-type of the media file as specified when uploading.
content_type TEXT NOT NULL,
-- The HTTP Content-Disposition header for the media file as specified when uploading.
content_disposition TEXT NOT NULL,
-- Size of the media file in bytes.
file_size_bytes BIGINT NOT NULL,
-- When the content was uploaded in UNIX epoch ms.
creation_ts BIGINT NOT NULL,
-- The file name with which the media was uploaded.
upload_name TEXT NOT NULL,
-- A golang base64 URLEncoding string representation of a SHA-256 hash sum of the file data.
base64hash TEXT NOT NULL,
-- The user who uploaded the file. Should be a Matrix user ID.
user_id TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS media_repository_index ON media_repository (media_id, media_origin);
`
const insertMediaSQL = `
INSERT INTO media_repository (media_id, media_origin, content_type, content_disposition, file_size_bytes, creation_ts, upload_name, base64hash, user_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`
const selectMediaSQL = `
SELECT content_type, content_disposition, file_size_bytes, creation_ts, upload_name, base64hash, user_id FROM media_repository WHERE media_id = $1 AND media_origin = $2
`
type mediaStatements struct {
insertMediaStmt *sql.Stmt
selectMediaStmt *sql.Stmt
}
func (s *mediaStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(mediaSchema)
if err != nil {
return
}
return statementList{
{&s.insertMediaStmt, insertMediaSQL},
{&s.selectMediaStmt, selectMediaSQL},
}.prepare(db)
}
func (s *mediaStatements) insertMedia(mediaMetadata *types.MediaMetadata) error {
mediaMetadata.CreationTimestamp = types.UnixMs(time.Now().UnixNano() / 1000000)
_, err := s.insertMediaStmt.Exec(
mediaMetadata.MediaID,
mediaMetadata.Origin,
mediaMetadata.ContentType,
mediaMetadata.ContentDisposition,
mediaMetadata.FileSizeBytes,
mediaMetadata.CreationTimestamp,
mediaMetadata.UploadName,
mediaMetadata.Base64Hash,
mediaMetadata.UserID,
)
return err
}
func (s *mediaStatements) selectMedia(mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName) (*types.MediaMetadata, error) {
mediaMetadata := types.MediaMetadata{
MediaID: mediaID,
Origin: mediaOrigin,
}
err := s.selectMediaStmt.QueryRow(
mediaMetadata.MediaID, mediaMetadata.Origin,
).Scan(
&mediaMetadata.ContentType,
&mediaMetadata.ContentDisposition,
&mediaMetadata.FileSizeBytes,
&mediaMetadata.CreationTimestamp,
&mediaMetadata.UploadName,
&mediaMetadata.Base64Hash,
&mediaMetadata.UserID,
)
return &mediaMetadata, err
}

View file

@ -0,0 +1,37 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// FIXME: This should be made common!
package storage
import (
"database/sql"
)
// a statementList is a list of SQL statements to prepare and a pointer to where to store the resulting prepared statement.
type statementList []struct {
statement **sql.Stmt
sql string
}
// prepare the SQL for each statement in the list and assign the result to the prepared statement.
func (s statementList) prepare(db *sql.DB) (err error) {
for _, statement := range s {
if *statement.statement, err = db.Prepare(statement.sql); err != nil {
return
}
}
return
}

View file

@ -0,0 +1,33 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"database/sql"
)
type statements struct {
mediaStatements
}
func (s *statements) prepare(db *sql.DB) error {
var err error
if err = s.mediaStatements.prepare(db); err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,56 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"database/sql"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
// A Database is used to store metadata about a repository of media files.
type Database struct {
statements statements
db *sql.DB
}
// Open a postgres database.
func Open(dataSourceName string) (*Database, error) {
var d Database
var err error
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = d.statements.prepare(d.db); err != nil {
return nil, err
}
return &d, nil
}
// StoreMediaMetadata inserts the metadata about the uploaded media into the database.
// Returns an error if the combination of MediaID and Origin are not unique in the table.
func (d *Database) StoreMediaMetadata(mediaMetadata *types.MediaMetadata) error {
return d.statements.insertMedia(mediaMetadata)
}
// GetMediaMetadata returns metadata about media stored on this server. The media could
// have been uploaded to this server or fetched from another server and cached here.
// Returns sql.ErrNoRows if there is no metadata associated with this media.
func (d *Database) GetMediaMetadata(mediaID types.MediaID, mediaOrigin gomatrixserverlib.ServerName) (*types.MediaMetadata, error) {
return d.statements.selectMedia(mediaID, mediaOrigin)
}

View file

@ -15,14 +15,18 @@
package writers
import (
"database/sql"
"fmt"
"net/http"
"net/url"
"path"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/mediaapi/config"
"github.com/matrix-org/dendrite/mediaapi/fileutils"
"github.com/matrix-org/dendrite/mediaapi/storage"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/util"
)
@ -46,13 +50,75 @@ type uploadResponse struct {
// This implementation supports a configurable maximum file size limit in bytes. If a user tries to upload more than this, they will receive an error that their upload is too large.
// Uploaded files are processed piece-wise to avoid DoS attacks which would starve the server of memory.
// TODO: We should time out requests if they have not received any data within a configured timeout period.
func Upload(req *http.Request, cfg *config.MediaAPI) util.JSONResponse {
func Upload(req *http.Request, cfg *config.MediaAPI, db *storage.Database) util.JSONResponse {
r, resErr := parseAndValidateRequest(req, cfg)
if resErr != nil {
return *resErr
}
// doUpload
r.Logger.WithFields(log.Fields{
"Origin": r.MediaMetadata.Origin,
"UploadName": r.MediaMetadata.UploadName,
"FileSizeBytes": r.MediaMetadata.FileSizeBytes,
"Content-Type": r.MediaMetadata.ContentType,
"Content-Disposition": r.MediaMetadata.ContentDisposition,
}).Info("Uploading file")
// The file data is hashed and the hash is used as the MediaID. The hash is useful as a
// method of deduplicating files to save storage, as well as a way to conduct
// integrity checks on the file data in the repository.
hash, bytesWritten, tmpDir, copyError := fileutils.WriteTempFile(req.Body, cfg.MaxFileSizeBytes, cfg.AbsBasePath)
if copyError != nil {
logFields := log.Fields{
"Origin": r.MediaMetadata.Origin,
"MediaID": r.MediaMetadata.MediaID,
}
if copyError == fileutils.ErrFileIsTooLarge {
logFields["MaxFileSizeBytes"] = cfg.MaxFileSizeBytes
}
r.Logger.WithError(copyError).WithFields(logFields).Warn("Error while transferring file")
fileutils.RemoveDir(tmpDir, r.Logger)
return util.JSONResponse{
Code: 400,
JSON: jsonerror.Unknown(fmt.Sprintf("Failed to upload")),
}
}
r.MediaMetadata.FileSizeBytes = bytesWritten
r.MediaMetadata.Base64Hash = hash
r.MediaMetadata.MediaID = types.MediaID(hash)
r.Logger.WithFields(log.Fields{
"MediaID": r.MediaMetadata.MediaID,
"Origin": r.MediaMetadata.Origin,
"Base64Hash": r.MediaMetadata.Base64Hash,
"UploadName": r.MediaMetadata.UploadName,
"FileSizeBytes": r.MediaMetadata.FileSizeBytes,
"Content-Type": r.MediaMetadata.ContentType,
"Content-Disposition": r.MediaMetadata.ContentDisposition,
}).Info("File uploaded")
// check if we already have a record of the media in our database and if so, we can remove the temporary directory
mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin)
if err == nil {
r.MediaMetadata = mediaMetadata
fileutils.RemoveDir(tmpDir, r.Logger)
return util.JSONResponse{
Code: 200,
JSON: uploadResponse{
ContentURI: fmt.Sprintf("mxc://%s/%s", cfg.ServerName, r.MediaMetadata.MediaID),
},
}
} else if err != sql.ErrNoRows {
r.Logger.WithError(err).WithField("MediaID", r.MediaMetadata.MediaID).Warn("Failed to query database")
}
// TODO: generate thumbnails
resErr = r.storeFileAndMetadata(tmpDir, cfg.AbsBasePath, db)
if resErr != nil {
return *resErr
}
return util.JSONResponse{
Code: 200,
@ -73,8 +139,6 @@ func parseAndValidateRequest(req *http.Request, cfg *config.MediaAPI) (*uploadRe
}
}
// authenticate user
r := &uploadRequest{
MediaMetadata: &types.MediaMetadata{
Origin: cfg.ServerName,
@ -149,3 +213,40 @@ func (r *uploadRequest) Validate(maxFileSizeBytes types.FileSizeBytes) *util.JSO
}
return nil
}
// storeFileAndMetadata first moves a temporary file named content from tmpDir to its
// final path (see getPathFromMediaMetadata for details.) Once the file is moved, the
// metadata about the file is written into the media repository database. This order
// of operations is important as it avoids metadata entering the database before the file
// is ready and if we fail to move the file, it never gets added to the database.
// In case of any error, appropriate files and directories are cleaned up a
// util.JSONResponse error is returned.
func (r *uploadRequest) storeFileAndMetadata(tmpDir types.Path, absBasePath types.Path, db *storage.Database) *util.JSONResponse {
finalPath, duplicate, err := fileutils.MoveFileWithHashCheck(tmpDir, r.MediaMetadata, absBasePath, r.Logger)
if err != nil {
r.Logger.WithError(err).Error("Failed to move file.")
return &util.JSONResponse{
Code: 400,
JSON: jsonerror.Unknown(fmt.Sprintf("Failed to upload")),
}
}
if duplicate {
r.Logger.WithField("dst", finalPath).Info("File was stored previously - discarding duplicate")
}
if err = db.StoreMediaMetadata(r.MediaMetadata); err != nil {
r.Logger.WithError(err).Warn("Failed to store metadata")
// If the file is a duplicate (has the same hash as an existing file) then
// there is valid metadata in the database for that file. As such we only
// remove the file if it is not a duplicate.
if duplicate == false {
fileutils.RemoveDir(types.Path(path.Dir(finalPath)), r.Logger)
}
return &util.JSONResponse{
Code: 400,
JSON: jsonerror.Unknown(fmt.Sprintf("Failed to upload")),
}
}
return nil
}