mxnet
io.h
Go to the documentation of this file.
1 
6 #ifndef DMLC_IO_H_
7 #define DMLC_IO_H_
8 #include <cstdio>
9 #include <string>
10 #include <cstring>
11 #include <vector>
12 #include <istream>
13 #include <ostream>
14 #include <streambuf>
15 #include "./logging.h"
16 
17 // include uint64_t only to make io standalone
18 #ifdef _MSC_VER
19 
20 typedef unsigned __int64 uint64_t;
21 #else
22 #include <inttypes.h>
23 #endif
24 
26 namespace dmlc {
30 class Stream { // NOLINT(*)
31  public:
38  virtual size_t Read(void *ptr, size_t size) = 0;
44  virtual void Write(const void *ptr, size_t size) = 0;
46  virtual ~Stream(void) {}
57  static Stream *Create(const char *uri,
58  const char* const flag,
59  bool allow_null = false);
60  // helper functions to write/read different data structures
73  template<typename T>
74  inline void Write(const T &data);
87  template<typename T>
88  inline bool Read(T *out_data);
95  template<typename T>
96  inline void WriteArray(const T* data, size_t num_elems);
104  template<typename T>
105  inline bool ReadArray(T* data, size_t num_elems);
106 };
107 
109 class SeekStream: public Stream {
110  public:
111  // virtual destructor
112  virtual ~SeekStream(void) {}
114  virtual void Seek(size_t pos) = 0;
116  virtual size_t Tell(void) = 0;
127  static SeekStream *CreateForRead(const char *uri,
128  bool allow_null = false);
129 };
130 
133  public:
135  virtual ~Serializable() {}
140  virtual void Load(Stream *fi) = 0;
145  virtual void Save(Stream *fo) const = 0;
146 };
147 
155 class InputSplit {
156  public:
158  struct Blob {
160  void *dptr;
162  size_t size;
163  };
172  virtual void HintChunkSize(size_t chunk_size) {}
174  virtual size_t GetTotalSize(void) = 0;
176  virtual void BeforeFirst(void) = 0;
190  virtual bool NextRecord(Blob *out_rec) = 0;
210  virtual bool NextChunk(Blob *out_chunk) = 0;
230  virtual bool NextBatch(Blob *out_chunk, size_t n_records) {
231  return NextChunk(out_chunk);
232  }
242  virtual void ResetPartition(unsigned part_index, unsigned num_parts) = 0;
261  static InputSplit* Create(const char *uri,
262  unsigned part_index,
263  unsigned num_parts,
264  const char *type);
293  static InputSplit* Create(const char *uri,
294  const char *index_uri,
295  unsigned part_index,
296  unsigned num_parts,
297  const char *type,
298  const bool shuffle = false,
299  const int seed = 0,
300  const size_t batch_size = 256,
301  const bool recurse_directories = false);
302 };
303 
304 #ifndef _LIBCPP_SGX_NO_IOSTREAMS
305 
318 class ostream : public std::basic_ostream<char> {
319  public:
325  explicit ostream(Stream *stream,
326  size_t buffer_size = (1 << 10))
327  : std::basic_ostream<char>(NULL), buf_(buffer_size) {
328  this->set_stream(stream);
329  }
330  // explictly synchronize the buffer
332  buf_.pubsync();
333  }
338  inline void set_stream(Stream *stream) {
339  buf_.set_stream(stream);
340  this->rdbuf(&buf_);
341  }
342 
344  inline size_t bytes_written(void) const {
345  return buf_.bytes_out();
346  }
347 
348  private:
349  // internal streambuf
350  class OutBuf : public std::streambuf {
351  public:
352  explicit OutBuf(size_t buffer_size)
353  : stream_(NULL), buffer_(buffer_size), bytes_out_(0) {
354  if (buffer_size == 0) buffer_.resize(2);
355  }
356  // set stream to the buffer
357  inline void set_stream(Stream *stream);
358 
359  inline size_t bytes_out() const { return bytes_out_; }
360  private:
362  Stream *stream_;
364  std::vector<char> buffer_;
366  size_t bytes_out_;
367  // override sync
368  inline int_type sync(void);
369  // override overflow
370  inline int_type overflow(int c);
371  };
373  OutBuf buf_;
374 };
375 
389 class istream : public std::basic_istream<char> {
390  public:
396  explicit istream(Stream *stream,
397  size_t buffer_size = (1 << 10))
398  : std::basic_istream<char>(NULL), buf_(buffer_size) {
399  this->set_stream(stream);
400  }
406  inline void set_stream(Stream *stream) {
407  buf_.set_stream(stream);
408  this->rdbuf(&buf_);
409  }
411  inline size_t bytes_read(void) const {
412  return buf_.bytes_read();
413  }
414 
415  private:
416  // internal streambuf
417  class InBuf : public std::streambuf {
418  public:
419  explicit InBuf(size_t buffer_size)
420  : stream_(NULL), bytes_read_(0),
421  buffer_(buffer_size) {
422  if (buffer_size == 0) buffer_.resize(2);
423  }
424  // set stream to the buffer
425  inline void set_stream(Stream *stream);
426  // return how many bytes read so far
427  inline size_t bytes_read(void) const {
428  return bytes_read_;
429  }
430  private:
432  Stream *stream_;
434  size_t bytes_read_;
436  std::vector<char> buffer_;
437  // override underflow
438  inline int_type underflow();
439  };
441  InBuf buf_;
442 };
443 #endif
444 } // namespace dmlc
445 
446 #include "./serializer.h"
447 
448 namespace dmlc {
449 // implementations of inline functions
450 template<typename T>
451 inline void Stream::Write(const T &data) {
452  serializer::Handler<T>::Write(this, data);
453 }
454 template<typename T>
455 inline bool Stream::Read(T *out_data) {
456  return serializer::Handler<T>::Read(this, out_data);
457 }
458 
459 template<typename T>
460 inline void Stream::WriteArray(const T* data, size_t num_elems) {
461  for (size_t i = 0; i < num_elems; ++i) {
462  this->Write<T>(data[i]);
463  }
464 }
465 
466 template<typename T>
467 inline bool Stream::ReadArray(T* data, size_t num_elems) {
468  for (size_t i = 0; i < num_elems; ++i) {
469  if (!this->Read<T>(data + i)) return false;
470  }
471  return true;
472 }
473 
474 #ifndef _LIBCPP_SGX_NO_IOSTREAMS
475 // implementations for ostream
476 inline void ostream::OutBuf::set_stream(Stream *stream) {
477  if (stream_ != NULL) this->pubsync();
478  this->stream_ = stream;
479  this->setp(&buffer_[0], &buffer_[0] + buffer_.size() - 1);
480 }
481 inline int ostream::OutBuf::sync(void) {
482  if (stream_ == NULL) return -1;
483  std::ptrdiff_t n = pptr() - pbase();
484  stream_->Write(pbase(), n);
485  this->pbump(-static_cast<int>(n));
486  bytes_out_ += n;
487  return 0;
488 }
489 inline int ostream::OutBuf::overflow(int c) {
490  *(this->pptr()) = c;
491  std::ptrdiff_t n = pptr() - pbase();
492  this->pbump(-static_cast<int>(n));
493  if (c == EOF) {
494  stream_->Write(pbase(), n);
495  bytes_out_ += n;
496  } else {
497  stream_->Write(pbase(), n + 1);
498  bytes_out_ += n + 1;
499  }
500  return c;
501 }
502 
503 // implementations for istream
504 inline void istream::InBuf::set_stream(Stream *stream) {
505  stream_ = stream;
506  this->setg(&buffer_[0], &buffer_[0], &buffer_[0]);
507 }
508 inline int istream::InBuf::underflow() {
509  char *bhead = &buffer_[0];
510  if (this->gptr() == this->egptr()) {
511  size_t sz = stream_->Read(bhead, buffer_.size());
512  this->setg(bhead, bhead, bhead + sz);
513  bytes_read_ += sz;
514  }
515  if (this->gptr() == this->egptr()) {
516  return traits_type::eof();
517  } else {
518  return traits_type::to_int_type(*gptr());
519  }
520 }
521 #endif
522 
523 namespace io {
525 struct URI {
527  std::string protocol;
531  std::string host;
533  std::string name;
535  URI(void) {}
539  explicit URI(const char *uri) {
540  const char *p = std::strstr(uri, "://");
541  if (p == NULL) {
542  name = uri;
543  } else {
544  protocol = std::string(uri, p - uri + 3);
545  uri = p + 3;
546  p = std::strchr(uri, '/');
547  if (p == NULL) {
548  host = uri; name = '/';
549  } else {
550  host = std::string(uri, p - uri);
551  name = p;
552  }
553  }
554  }
556  inline std::string str(void) const {
557  return protocol + host + name;
558  }
559 };
560 
562 enum FileType {
567 };
568 
570 struct FileInfo {
574  size_t size;
578  FileInfo() : size(0), type(kFile) {}
579 };
580 
582 class FileSystem {
583  public:
591  static FileSystem *GetInstance(const URI &path);
593  virtual ~FileSystem() {}
599  virtual FileInfo GetPathInfo(const URI &path) = 0;
605  virtual void ListDirectory(const URI &path, std::vector<FileInfo> *out_list) = 0;
611  virtual void ListDirectoryRecursive(const URI &path,
612  std::vector<FileInfo> *out_list);
620  virtual Stream *Open(const URI &path,
621  const char* const flag,
622  bool allow_null = false) = 0;
629  virtual SeekStream *OpenForRead(const URI &path,
630  bool allow_null = false) = 0;
631 };
632 
633 } // namespace io
634 } // namespace dmlc
635 #endif // DMLC_IO_H_
a blob of memory region
Definition: io.h:158
FileType type
the type of the file
Definition: io.h:576
virtual ~SeekStream(void)
Definition: io.h:112
URI(void)
enable default constructor
Definition: io.h:535
virtual ~FileSystem()
virtual destructor
Definition: io.h:593
virtual ~istream() DMLC_NO_EXCEPTION
Definition: io.h:401
interface for serializable objects
Definition: io.h:132
#define DMLC_THROW_EXCEPTION
Definition: base.h:224
virtual ~Serializable()
virtual destructor
Definition: io.h:135
URI(const char *uri)
construct from URI string
Definition: io.h:539
std::string str(void) const
string representation
Definition: io.h:556
FileInfo()
default constructor
Definition: io.h:578
#define DMLC_NO_EXCEPTION
Definition: base.h:225
void set_stream(Stream *stream)
set internal stream to be stream, reset states
Definition: io.h:406
serializer template class that helps serialization. This file do not need to be directly used by most...
static Stream * Create(const char *uri, const char *const flag, bool allow_null=false)
generic factory function create an stream, the stream will close the underlying files upon deletion ...
virtual ~InputSplit(void) DMLC_THROW_EXCEPTION
destructor
Definition: io.h:234
Definition: optional.h:241
virtual ~Stream(void)
virtual destructor
Definition: io.h:46
common data structure for URI
Definition: io.h:525
ostream(Stream *stream, size_t buffer_size=(1<< 10))
construct std::ostream type
Definition: io.h:325
interface of stream I/O for serialization
Definition: io.h:30
file system system interface
Definition: io.h:582
static void Write(Stream *strm, const T &data)
write data to stream
Definition: serializer.h:265
use to store file information
Definition: io.h:570
void WriteArray(const T *data, size_t num_elems)
Endian aware write array of data.
Definition: io.h:460
virtual bool NextBatch(Blob *out_chunk, size_t n_records)
get a chunk of memory that can contain multiple records, with hint for how many records is needed...
Definition: io.h:230
FileType
type of file
Definition: io.h:562
size_t size
the size of the file
Definition: io.h:574
std::string protocol
protocol
Definition: io.h:527
URI path
full path to the file
Definition: io.h:572
namespace for dmlc
Definition: array_view.h:12
a std::ostream class that can can wrap Stream objects, can use ostream with that output to underlying...
Definition: io.h:318
size_t size
size of the memory region
Definition: io.h:162
bool ReadArray(T *data, size_t num_elems)
Endian aware read array of data.
Definition: io.h:467
input split creates that allows reading of records from split of data, independent part that covers a...
Definition: io.h:155
virtual size_t Read(void *ptr, size_t size)=0
reads data from a stream
virtual ~ostream() DMLC_NO_EXCEPTION
Definition: io.h:331
interface of i/o stream that support seek
Definition: io.h:109
void * dptr
points to start of the memory region
Definition: io.h:160
virtual void HintChunkSize(size_t chunk_size)
hint the inputsplit how large the chunk size it should return when implementing NextChunk this is a h...
Definition: io.h:172
virtual void Write(const void *ptr, size_t size)=0
writes data to a stream
the file is directory
Definition: io.h:566
size_t bytes_read(void) const
Definition: io.h:411
std::string host
host name, namenode for HDFS, bucket name for s3
Definition: io.h:531
std::string name
name of the path
Definition: io.h:533
the file is file
Definition: io.h:564
void set_stream(Stream *stream)
set internal stream to be stream, reset states
Definition: io.h:338
a std::istream class that can can wrap Stream objects, can use istream with that output to underlying...
Definition: io.h:389
istream(Stream *stream, size_t buffer_size=(1<< 10))
construct std::ostream type
Definition: io.h:396
static bool Read(Stream *strm, T *data)
read data to stream
Definition: serializer.h:283
size_t bytes_written(void) const
Definition: io.h:344