file_resource.go - afero - [fork] go afero port for 9front
(HTM) git clone https://git.drkhsh.at/afero.git
(DIR) Log
(DIR) Files
(DIR) Refs
(DIR) README
(DIR) LICENSE
---
file_resource.go (6677B)
---
1 // Copyright © 2021 Vasily Ovchinnikov <vasily@remerge.io>.
2 //
3 // The code in this file is derived from afero fork github.com/Zatte/afero by Mikael Rapp
4 // licensed under Apache License 2.0.
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 // You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 package gcsfs
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "os"
25 "syscall"
26
27 "github.com/spf13/afero/gcsfs/internal/stiface"
28 )
29
30 const (
31 maxWriteSize = 10000
32 )
33
34 // gcsFileResource represents a singleton version of each GCS object;
35 // Google cloud storage allows users to open multiple writers(!) to the same
36 // underlying resource, once the write is closed the written stream is commented. We are doing
37 // some magic where we read and and write to the same file which requires synchronization
38 // of the underlying resource.
39
40 type gcsFileResource struct {
41 ctx context.Context
42
43 fs *Fs
44
45 obj stiface.ObjectHandle
46 name string
47 fileMode os.FileMode
48
49 currentGcsSize int64
50 offset int64
51 reader io.ReadCloser
52 writer io.WriteCloser
53
54 closed bool
55 }
56
57 func (o *gcsFileResource) Close() error {
58 o.closed = true
59 // TODO rawGcsObjectsMap ?
60 return o.maybeCloseIo()
61 }
62
63 func (o *gcsFileResource) maybeCloseIo() error {
64 if err := o.maybeCloseReader(); err != nil {
65 return fmt.Errorf("error closing reader: %v", err)
66 }
67 if err := o.maybeCloseWriter(); err != nil {
68 return fmt.Errorf("error closing writer: %v", err)
69 }
70
71 return nil
72 }
73
74 func (o *gcsFileResource) maybeCloseReader() error {
75 if o.reader == nil {
76 return nil
77 }
78 if err := o.reader.Close(); err != nil {
79 return err
80 }
81 o.reader = nil
82 return nil
83 }
84
85 func (o *gcsFileResource) maybeCloseWriter() error {
86 if o.writer == nil {
87 return nil
88 }
89
90 // In cases of partial writes (e.g. to the middle of a file stream), we need to
91 // append any remaining data from the original file before we close the reader (and
92 // commit the results.)
93 // For small writes it can be more efficient
94 // to keep the original reader but that is for another iteration
95 if o.currentGcsSize > o.offset {
96 currentFile, err := o.obj.NewRangeReader(o.ctx, o.offset, -1)
97 if err != nil {
98 return fmt.Errorf(
99 "couldn't simulate a partial write; the closing (and thus"+
100 " the whole file write) is NOT commited to GCS. %v", err)
101 }
102 if currentFile != nil && currentFile.Remain() > 0 {
103 if _, err := io.Copy(o.writer, currentFile); err != nil {
104 return fmt.Errorf("error writing: %v", err)
105 }
106 }
107 }
108
109 if err := o.writer.Close(); err != nil {
110 return err
111 }
112 o.writer = nil
113 return nil
114 }
115
116 func (o *gcsFileResource) ReadAt(p []byte, off int64) (n int, err error) {
117 if cap(p) == 0 {
118 return 0, nil
119 }
120
121 // Assume that if the reader is open; it is at the correct offset
122 // a good performance assumption that we must ensure holds
123 if off == o.offset && o.reader != nil {
124 n, err = o.reader.Read(p)
125 o.offset += int64(n)
126 return n, err
127 }
128
129 // we have to check, whether it's a folder; the folder must not have an open readers, or writers though,
130 // so this check should not be invoked excessively and cause too much of a performance drop
131 if o.reader == nil && o.writer == nil {
132 var info *FileInfo
133 info, err = newFileInfo(o.name, o.fs, o.fileMode)
134 if err != nil {
135 return 0, err
136 }
137
138 if info.IsDir() {
139 // trying to read a directory must return this
140 return 0, syscall.EISDIR
141 }
142 }
143
144 // If any writers have written anything; commit it first so we can read it back.
145 if err = o.maybeCloseIo(); err != nil {
146 return 0, err
147 }
148
149 // Then read at the correct offset.
150 r, err := o.obj.NewRangeReader(o.ctx, off, -1)
151 if err != nil {
152 return 0, err
153 }
154 o.reader = r
155 o.offset = off
156
157 read, err := o.reader.Read(p)
158 o.offset += int64(read)
159 return read, err
160 }
161
162 func (o *gcsFileResource) WriteAt(b []byte, off int64) (n int, err error) {
163 // If the writer is opened and at the correct offset we're good!
164 if off == o.offset && o.writer != nil {
165 n, err = o.writer.Write(b)
166 o.offset += int64(n)
167 return n, err
168 }
169
170 // Ensure readers must be re-opened and that if a writer is active at another
171 // offset it is first committed before we do a "seek" below
172 if err = o.maybeCloseIo(); err != nil {
173 return 0, err
174 }
175
176 w := o.obj.NewWriter(o.ctx)
177 // TRIGGER WARNING: This can seem like a hack but it works thanks
178 // to GCS strong consistency. We will open and write to the same file; First when the
179 // writer is closed will the content get committed to GCS.
180 // The general idea is this:
181 // Objectv1[:offset] -> Objectv2
182 // newData1 -> Objectv2
183 // Objectv1[offset+len(newData1):] -> Objectv2
184 // Objectv2.Close
185 //
186 // It will however require a download and upload of the original file but it
187 // can't be avoided if we should support seek-write-operations on GCS.
188 objAttrs, err := o.obj.Attrs(o.ctx)
189 if err != nil {
190 if off > 0 {
191 return 0, err // WriteAt to a non existing file
192 }
193
194 o.currentGcsSize = 0
195 } else {
196 o.currentGcsSize = objAttrs.Size
197 }
198
199 if off > o.currentGcsSize {
200 return 0, ErrOutOfRange
201 }
202
203 if off > 0 {
204 var r stiface.Reader
205 r, err = o.obj.NewReader(o.ctx)
206 if err != nil {
207 return 0, err
208 }
209 if _, err = io.CopyN(w, r, off); err != nil {
210 return 0, err
211 }
212 if err = r.Close(); err != nil {
213 return 0, err
214 }
215 }
216
217 o.writer = w
218 o.offset = off
219
220 written, err := o.writer.Write(b)
221
222 o.offset += int64(written)
223 return written, err
224 }
225
226 func min(x, y int) int {
227 if x < y {
228 return x
229 }
230 return y
231 }
232
233 func (o *gcsFileResource) Truncate(wantedSize int64) error {
234 if wantedSize < 0 {
235 return ErrOutOfRange
236 }
237
238 if err := o.maybeCloseIo(); err != nil {
239 return err
240 }
241
242 r, err := o.obj.NewRangeReader(o.ctx, 0, wantedSize)
243 if err != nil {
244 return err
245 }
246
247 w := o.obj.NewWriter(o.ctx)
248 written, err := io.Copy(w, r)
249 if err != nil {
250 return err
251 }
252
253 for written < wantedSize {
254 // Bulk up padding writes
255 paddingBytes := bytes.Repeat([]byte(" "), min(maxWriteSize, int(wantedSize-written)))
256
257 n := 0
258 if n, err = w.Write(paddingBytes); err != nil {
259 return err
260 }
261
262 written += int64(n)
263 }
264 if err = r.Close(); err != nil {
265 return fmt.Errorf("error closing reader: %v", err)
266 }
267 if err = w.Close(); err != nil {
268 return fmt.Errorf("error closing writer: %v", err)
269 }
270 return nil
271 }