mxnet
io.h
Go to the documentation of this file.
1 
6 #ifndef DMLC_IO_H_
7 #define DMLC_IO_H_
8 #include <cstdio>
9 #include <string>
10 #include <cstring>
11 #include <vector>
12 #include <istream>
13 #include <ostream>
14 #include <streambuf>
15 #include "./logging.h"
16 
17 // include uint64_t only to make io standalone
18 #ifdef _MSC_VER
19 
20 typedef unsigned __int64 uint64_t;
21 #else
22 #include <inttypes.h>
23 #endif
24 
26 namespace dmlc {
30 class Stream { // NOLINT(*)
31  public:
38  virtual size_t Read(void *ptr, size_t size) = 0;
44  virtual void Write(const void *ptr, size_t size) = 0;
46  virtual ~Stream(void) {}
57  static Stream *Create(const char *uri,
58  const char* const flag,
59  bool allow_null = false);
60  // helper functions to write/read different data structures
73  template<typename T>
74  inline void Write(const T &data);
87  template<typename T>
88  inline bool Read(T *out_data);
95  template<typename T>
96  inline void WriteArray(const T* data, size_t num_elems);
104  template<typename T>
105  inline bool ReadArray(T* data, size_t num_elems);
106 };
107 
109 class SeekStream: public Stream {
110  public:
111  // virtual destructor
112  virtual ~SeekStream(void) {}
114  virtual void Seek(size_t pos) = 0;
116  virtual size_t Tell(void) = 0;
127  static SeekStream *CreateForRead(const char *uri,
128  bool allow_null = false);
129 };
130 
133  public:
135  virtual ~Serializable() {}
140  virtual void Load(Stream *fi) = 0;
145  virtual void Save(Stream *fo) const = 0;
146 };
147 
155 class InputSplit {
156  public:
158  struct Blob {
160  void *dptr;
162  size_t size;
163  };
172  virtual void HintChunkSize(size_t chunk_size) {}
174  virtual size_t GetTotalSize(void) = 0;
176  virtual void BeforeFirst(void) = 0;
190  virtual bool NextRecord(Blob *out_rec) = 0;
210  virtual bool NextChunk(Blob *out_chunk) = 0;
230  virtual bool NextBatch(Blob *out_chunk, size_t n_records) {
231  return NextChunk(out_chunk);
232  }
242  virtual void ResetPartition(unsigned part_index, unsigned num_parts) = 0;
261  static InputSplit* Create(const char *uri,
262  unsigned part_index,
263  unsigned num_parts,
264  const char *type);
293  static InputSplit* Create(const char *uri,
294  const char *index_uri,
295  unsigned part_index,
296  unsigned num_parts,
297  const char *type,
298  const bool shuffle = false,
299  const int seed = 0,
300  const size_t batch_size = 256,
301  const bool recurse_directories = false);
302 };
303 
304 #ifndef _LIBCPP_SGX_NO_IOSTREAMS
305 
318 class ostream : public std::basic_ostream<char> {
319  public:
325  explicit ostream(Stream *stream,
326  size_t buffer_size = (1 << 10))
327  : std::basic_ostream<char>(NULL), buf_(buffer_size) {
328  this->set_stream(stream);
329  }
330  // explictly synchronize the buffer
332  buf_.pubsync();
333  }
338  inline void set_stream(Stream *stream) {
339  buf_.set_stream(stream);
340  this->rdbuf(&buf_);
341  }
342 
344  inline size_t bytes_written(void) const {
345  return buf_.bytes_out();
346  }
347 
348  private:
349  // internal streambuf
350  class OutBuf : public std::streambuf {
351  public:
352  explicit OutBuf(size_t buffer_size)
353  : stream_(NULL), buffer_(buffer_size), bytes_out_(0) {
354  if (buffer_size == 0) buffer_.resize(2);
355  }
356  // set stream to the buffer
357  inline void set_stream(Stream *stream);
358 
359  inline size_t bytes_out() const { return bytes_out_; }
360  private:
362  Stream *stream_;
364  std::vector<char> buffer_;
366  size_t bytes_out_;
367  // override sync
368  inline int_type sync(void);
369  // override overflow
370  inline int_type overflow(int c);
371  };
373  OutBuf buf_;
374 };
375 
389 class istream : public std::basic_istream<char> {
390  public:
396  explicit istream(Stream *stream,
397  size_t buffer_size = (1 << 10))
398  : std::basic_istream<char>(NULL), buf_(buffer_size) {
399  this->set_stream(stream);
400  }
406  inline void set_stream(Stream *stream) {
407  buf_.set_stream(stream);
408  this->rdbuf(&buf_);
409  }
411  inline size_t bytes_read(void) const {
412  return buf_.bytes_read();
413  }
414 
415  private:
416  // internal streambuf
417  class InBuf : public std::streambuf {
418  public:
419  explicit InBuf(size_t buffer_size)
420  : stream_(NULL), bytes_read_(0),
421  buffer_(buffer_size) {
422  if (buffer_size == 0) buffer_.resize(2);
423  }
424  // set stream to the buffer
425  inline void set_stream(Stream *stream);
426  // return how many bytes read so far
427  inline size_t bytes_read(void) const {
428  return bytes_read_;
429  }
430  private:
432  Stream *stream_;
434  size_t bytes_read_;
436  std::vector<char> buffer_;
437  // override underflow
438  inline int_type underflow();
439  };
441  InBuf buf_;
442 };
443 #endif
444 } // namespace dmlc
445 
446 #include "./serializer.h"
447 
448 namespace dmlc {
449 // implementations of inline functions
450 template<typename T>
451 inline void Stream::Write(const T &data) {
452  serializer::Handler<T>::Write(this, data);
453 }
454 template<typename T>
455 inline bool Stream::Read(T *out_data) {
456  return serializer::Handler<T>::Read(this, out_data);
457 }
458 
459 template<typename T>
460 inline void Stream::WriteArray(const T* data, size_t num_elems) {
461  for (size_t i = 0; i < num_elems; ++i) {
462  this->Write<T>(data[i]);
463  }
464 }
465 
466 template<typename T>
467 inline bool Stream::ReadArray(T* data, size_t num_elems) {
468  for (size_t i = 0; i < num_elems; ++i) {
469  if (!this->Read<T>(data + i)) return false;
470  }
471  return true;
472 }
473 
474 #ifndef _LIBCPP_SGX_NO_IOSTREAMS
475 // implementations for ostream
476 inline void ostream::OutBuf::set_stream(Stream *stream) {
477  if (stream_ != NULL) this->pubsync();
478  this->stream_ = stream;
479  this->setp(&buffer_[0], &buffer_[0] + buffer_.size() - 1);
480 }
481 inline int ostream::OutBuf::sync(void) {
482  if (stream_ == NULL) return -1;
483  std::ptrdiff_t n = pptr() - pbase();
484  stream_->Write(pbase(), n);
485  this->pbump(-static_cast<int>(n));
486  bytes_out_ += n;
487  return 0;
488 }
489 inline int ostream::OutBuf::overflow(int c) {
490  *(this->pptr()) = c;
491  std::ptrdiff_t n = pptr() - pbase();
492  this->pbump(-static_cast<int>(n));
493  if (c == EOF) {
494  stream_->Write(pbase(), n);
495  bytes_out_ += n;
496  } else {
497  stream_->Write(pbase(), n + 1);
498  bytes_out_ += n + 1;
499  }
500  return c;
501 }
502 
503 // implementations for istream
504 inline void istream::InBuf::set_stream(Stream *stream) {
505  stream_ = stream;
506  this->setg(&buffer_[0], &buffer_[0], &buffer_[0]);
507 }
508 inline int istream::InBuf::underflow() {
509  char *bhead = &buffer_[0];
510  if (this->gptr() == this->egptr()) {
511  size_t sz = stream_->Read(bhead, buffer_.size());
512  this->setg(bhead, bhead, bhead + sz);
513  bytes_read_ += sz;
514  }
515  if (this->gptr() == this->egptr()) {
516  return traits_type::eof();
517  } else {
518  return traits_type::to_int_type(*gptr());
519  }
520 }
521 #endif
522 
523 namespace io {
525 struct URI {
527  std::string protocol;
531  std::string host;
533  std::string name;
535  URI(void) {}
539  explicit URI(const char *uri) {
540  const char *p = std::strstr(uri, "://");
541  if (p == NULL) {
542  name = uri;
543  } else {
544  protocol = std::string(uri, p - uri + 3);
545  uri = p + 3;
546  p = std::strchr(uri, '/');
547  if (p == NULL) {
548  host = uri; name = '/';
549  } else {
550  host = std::string(uri, p - uri);
551  name = p;
552  }
553  }
554  }
556  inline std::string str(void) const {
557  return protocol + host + name;
558  }
559 };
560 
562 enum FileType {
567 };
568 
570 struct FileInfo {
574  size_t size;
578  FileInfo() : size(0), type(kFile) {}
579 };
580 
582 class FileSystem {
583  public:
591  static FileSystem *GetInstance(const URI &path);
593  virtual ~FileSystem() {}
599  virtual FileInfo GetPathInfo(const URI &path) = 0;
605  virtual void ListDirectory(const URI &path, std::vector<FileInfo> *out_list) = 0;
611  virtual void ListDirectoryRecursive(const URI &path,
612  std::vector<FileInfo> *out_list);
620  virtual Stream *Open(const URI &path,
621  const char* const flag,
622  bool allow_null = false) = 0;
629  virtual SeekStream *OpenForRead(const URI &path,
630  bool allow_null = false) = 0;
631 };
632 
633 } // namespace io
634 } // namespace dmlc
635 #endif // DMLC_IO_H_
dmlc::io::URI::name
std::string name
name of the path
Definition: io.h:533
dmlc::io::URI::URI
URI(void)
enable default constructor
Definition: io.h:535
dmlc::istream::istream
istream(Stream *stream, size_t buffer_size=(1<< 10))
construct std::ostream type
Definition: io.h:396
dmlc::Serializable::~Serializable
virtual ~Serializable()
virtual destructor
Definition: io.h:135
dmlc
namespace for dmlc
Definition: array_view.h:12
dmlc::InputSplit::Create
static InputSplit * Create(const char *uri, unsigned part_index, unsigned num_parts, const char *type)
factory function: create input split given a uri
dmlc::io::FileSystem::GetInstance
static FileSystem * GetInstance(const URI &path)
get singleton of filesystem instance according to URI
DMLC_NO_EXCEPTION
#define DMLC_NO_EXCEPTION
Definition: base.h:234
dmlc::io::FileSystem::ListDirectory
virtual void ListDirectory(const URI &path, std::vector< FileInfo > *out_list)=0
list files in a directory
dmlc::SeekStream::~SeekStream
virtual ~SeekStream(void)
Definition: io.h:112
dmlc::io::FileInfo::type
FileType type
the type of the file
Definition: io.h:576
dmlc::InputSplit::Blob
a blob of memory region
Definition: io.h:158
dmlc::SeekStream::Tell
virtual size_t Tell(void)=0
tell the position of the stream
dmlc::ostream
a std::ostream class that can can wrap Stream objects, can use ostream with that output to underlying...
Definition: io.h:318
dmlc::io::FileSystem::OpenForRead
virtual SeekStream * OpenForRead(const URI &path, bool allow_null=false)=0
open a seekable stream for read
dmlc::InputSplit::NextRecord
virtual bool NextRecord(Blob *out_rec)=0
get the next record, the returning value is valid until next call to NextRecord, NextChunk or NextBat...
dmlc::io::FileInfo::size
size_t size
the size of the file
Definition: io.h:574
dmlc::InputSplit::~InputSplit
virtual ~InputSplit(void) DMLC_THROW_EXCEPTION
destructor
Definition: io.h:234
dmlc::Stream::ReadArray
bool ReadArray(T *data, size_t num_elems)
Endian aware read array of data.
Definition: io.h:467
dmlc::InputSplit::BeforeFirst
virtual void BeforeFirst(void)=0
reset the position of InputSplit to beginning
dmlc::Serializable::Load
virtual void Load(Stream *fi)=0
load the model from a stream
dmlc::ostream::~ostream
virtual ~ostream() DMLC_NO_EXCEPTION
Definition: io.h:331
dmlc::InputSplit::Blob::dptr
void * dptr
points to start of the memory region
Definition: io.h:160
dmlc::Stream::Write
virtual void Write(const void *ptr, size_t size)=0
writes data to a stream
dmlc::io::FileInfo::FileInfo
FileInfo()
default constructor
Definition: io.h:578
dmlc::InputSplit::GetTotalSize
virtual size_t GetTotalSize(void)=0
get the total size of the InputSplit
dmlc::InputSplit::ResetPartition
virtual void ResetPartition(unsigned part_index, unsigned num_parts)=0
reset the Input split to a certain part id, The InputSplit will be pointed to the head of the new spe...
dmlc::istream::bytes_read
size_t bytes_read(void) const
Definition: io.h:411
dmlc::io::FileInfo::path
URI path
full path to the file
Definition: io.h:572
dmlc::io::URI::URI
URI(const char *uri)
construct from URI string
Definition: io.h:539
dmlc::InputSplit::NextChunk
virtual bool NextChunk(Blob *out_chunk)=0
get a chunk of memory that can contain multiple records, the caller needs to parse the content of the...
dmlc::io::kDirectory
@ kDirectory
the file is directory
Definition: io.h:566
dmlc::ostream::ostream
ostream(Stream *stream, size_t buffer_size=(1<< 10))
construct std::ostream type
Definition: io.h:325
dmlc::SeekStream
interface of i/o stream that support seek
Definition: io.h:109
dmlc::io::URI::protocol
std::string protocol
protocol
Definition: io.h:527
dmlc::io::URI
common data structure for URI
Definition: io.h:525
serializer.h
serializer template class that helps serialization. This file do not need to be directly used by most...
dmlc::io::URI::host
std::string host
host name, namenode for HDFS, bucket name for s3
Definition: io.h:531
dmlc::Serializable::Save
virtual void Save(Stream *fo) const =0
saves the model to a stream
dmlc::io::FileSystem::Open
virtual Stream * Open(const URI &path, const char *const flag, bool allow_null=false)=0
open a stream
dmlc::io::FileSystem::ListDirectoryRecursive
virtual void ListDirectoryRecursive(const URI &path, std::vector< FileInfo > *out_list)
list files in a directory recursively using ListDirectory
dmlc::ostream::bytes_written
size_t bytes_written(void) const
Definition: io.h:344
dmlc::io::FileSystem::~FileSystem
virtual ~FileSystem()
virtual destructor
Definition: io.h:593
dmlc::ostream::set_stream
void set_stream(Stream *stream)
set internal stream to be stream, reset states
Definition: io.h:338
dmlc::Stream::Read
virtual size_t Read(void *ptr, size_t size)=0
reads data from a stream
dmlc::istream
a std::istream class that can can wrap Stream objects, can use istream with that output to underlying...
Definition: io.h:389
dmlc::InputSplit::HintChunkSize
virtual void HintChunkSize(size_t chunk_size)
hint the inputsplit how large the chunk size it should return when implementing NextChunk this is a h...
Definition: io.h:172
dmlc::io::FileInfo
use to store file information
Definition: io.h:570
dmlc::serializer::Handler::Write
static void Write(Stream *strm, const T &data)
write data to stream
Definition: serializer.h:265
dmlc::SeekStream::CreateForRead
static SeekStream * CreateForRead(const char *uri, bool allow_null=false)
generic factory function create an SeekStream for read only, the stream will close the underlying fil...
dmlc::io::FileType
FileType
type of file
Definition: io.h:562
std
Definition: optional.h:251
dmlc::Stream::~Stream
virtual ~Stream(void)
virtual destructor
Definition: io.h:46
dmlc::istream::~istream
virtual ~istream() DMLC_NO_EXCEPTION
Definition: io.h:401
dmlc::Serializable
interface for serializable objects
Definition: io.h:132
dmlc::InputSplit::Blob::size
size_t size
size of the memory region
Definition: io.h:162
dmlc::io::kFile
@ kFile
the file is file
Definition: io.h:564
dmlc::io::URI::str
std::string str(void) const
string representation
Definition: io.h:556
dmlc::Stream::WriteArray
void WriteArray(const T *data, size_t num_elems)
Endian aware write array of data.
Definition: io.h:460
DMLC_THROW_EXCEPTION
#define DMLC_THROW_EXCEPTION
Definition: base.h:233
dmlc::io::FileSystem
file system system interface
Definition: io.h:582
dmlc::InputSplit::NextBatch
virtual bool NextBatch(Blob *out_chunk, size_t n_records)
get a chunk of memory that can contain multiple records, with hint for how many records is needed,...
Definition: io.h:230
dmlc::Stream
interface of stream I/O for serialization
Definition: io.h:30
dmlc::Stream::Create
static Stream * Create(const char *uri, const char *const flag, bool allow_null=false)
generic factory function create an stream, the stream will close the underlying files upon deletion
dmlc::istream::set_stream
void set_stream(Stream *stream)
set internal stream to be stream, reset states
Definition: io.h:406
dmlc::io::FileSystem::GetPathInfo
virtual FileInfo GetPathInfo(const URI &path)=0
get information about a path
dmlc::SeekStream::Seek
virtual void Seek(size_t pos)=0
seek to certain position of the file
dmlc::InputSplit
input split creates that allows reading of records from split of data, independent part that covers a...
Definition: io.h:155
dmlc::serializer::Handler::Read
static bool Read(Stream *strm, T *data)
read data to stream
Definition: serializer.h:283