diff --git a/src/XrdCeph/XrdCephOss.cc b/src/XrdCeph/XrdCephOss.cc index 5ff411d33f4..c6e043c0bc1 100644 --- a/src/XrdCeph/XrdCephOss.cc +++ b/src/XrdCeph/XrdCephOss.cc @@ -127,6 +127,9 @@ ssize_t getNumericAttr(const char* const path, const char* attrName, const int m return retval; } +char *g_cksLogFileName; + +extern FILE *g_cksLogFile; extern "C" { @@ -165,6 +168,12 @@ XrdCephOss::~XrdCephOss() { extern unsigned int g_maxCephPoolIdx; extern unsigned int g_cephAioWaitThresh; +extern bool g_calcStreamedAdler32; +extern bool g_storeStreamedAdler32; +extern bool g_logStreamedAdler32; + + + int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { int NoGo = 0; XrdOucEnv myEnv; @@ -348,7 +357,7 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { if (!Config.GetRest(parms, sizeof(parms)) || parms[0]) { Eroute.Emsg("Config", "readvalgname parameters will be ignored"); } - m_configBufferIOmode = var; // allowed values would be aio, io + m_configBufferIOmode = var; // allowed values would be aio, io, write-only-io } else { Eroute.Emsg("Config", "Missing value for ceph.bufferiomode in config file", configfn); return 1; @@ -361,9 +370,59 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { m_configPoolnames = var; } else { Eroute.Emsg("Config", "Missing value for ceph.reportingpools in config file", configfn); - return 1; + return 1; } - } + } + if (!strcmp(var, "ceph.streamed-cks-adler32")) { // Streaming Adler32 checksum + + var = Config.GetWord(); + if (var) { +/* + * Currently, actions are simply additive: + * + * Store implies calculate, log, store + * Log implies calculate, log + * Calc implies calculate + * + * Might want to make e.g. logging optional in the future, + * when storing is more prevalent. + * + * Instead of setting g_* flags in three conditionals, + * can switch to setting values in a single bitfield flag + * + */ + if (strstr(var, "calc")) { + g_calcStreamedAdler32 = true; + g_logStreamedAdler32 = false; + g_storeStreamedAdler32 = false; + } + if (strstr(var, "log")) { + g_calcStreamedAdler32 = true; + g_logStreamedAdler32 = true; + g_storeStreamedAdler32 = false; + } + if (strstr(var, "store")) { + g_calcStreamedAdler32 = true; + g_logStreamedAdler32 = true; + g_storeStreamedAdler32 = true; + } + + } + }// "ceph.streamed-cks-adler32" + + if (!strcmp(var, "ceph.streamed-cks-logfile") ) { + var = Config.GetWord(); + if (var) { + g_cksLogFileName = strdup(var); + } else { + const char *defLogFileName = "/tmp/checksums.log"; // To-DO: Move defLogFileName so it can also be used as fallback + // when attempt to open specified log file below fails + Eroute.Emsg("Config", "Missing value for ceph.streamed-cks-logfile in config file, setting to default = ", defLogFileName); + g_cksLogFileName = strdup(defLogFileName); + return 1; + } + }// "ceph.streamed-cks-logfile" + } // while // Now check if any errors occurred during file i/o @@ -374,6 +433,17 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) { configfn); } Config.Close(); + + if (g_logStreamedAdler32) { + if (NULL == (g_cksLogFile = fopen(g_cksLogFileName, "a"))) { + g_logStreamedAdler32 = false; + Eroute.Emsg("Config: ", "cannot open file for logging checksum values and pathname", g_cksLogFileName); + return 1; + } else { + Eroute.Emsg("Config: ", "Opened file for logging checksum values and pathname: ", g_cksLogFileName); + } + } + } return NoGo; } diff --git a/src/XrdCeph/XrdCephOssBufferedFile.cc b/src/XrdCeph/XrdCephOssBufferedFile.cc index 0862d39b4d0..63169cd99b9 100644 --- a/src/XrdCeph/XrdCephOssBufferedFile.cc +++ b/src/XrdCeph/XrdCephOssBufferedFile.cc @@ -151,6 +151,10 @@ ssize_t XrdCephOssBufferedFile::Read(off_t offset, size_t blen) { ssize_t XrdCephOssBufferedFile::Read(void *buff, off_t offset, size_t blen) { size_t thread_id = std::hash{}(std::this_thread::get_id()); + if (m_bufferIOmode == "write-only-io") { + return m_xrdOssDF->Read(buff, offset, blen); + } + IXrdCephBufferAlg * buffer{nullptr}; // check for, and create if needed, a buffer { @@ -326,11 +330,11 @@ std::unique_ptr XrdCephOssBufferedFile::create std::unique_ptr cephio; if (m_bufferIOmode == "aio") { cephio = std::unique_ptr(new CephIOAdapterAIORaw(cephbuffer.get(),m_fd)); - } else if (m_bufferIOmode == "io") { + } else if (m_bufferIOmode == "io" || m_bufferIOmode == "write-only-io") { cephio = std::unique_ptr(new CephIOAdapterRaw(cephbuffer.get(),m_fd, !m_cephoss->m_useDefaultPreadAlg)); } else { - BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " ); + BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io|write-only-io " ); m_xrdOssDF->Close(); return bufferAlg; // invalid instance; } diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 65ee444a114..ff2dcec0a43 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -54,7 +54,49 @@ #include "XrdCeph/XrdCephPosix.hh" #include "XrdCeph/XrdCephBulkAioRead.hh" #include "XrdSfs/XrdSfsFlags.hh" // for the OFFLINE flag status +#include "XrdCks/XrdCksData.hh" +#include + +char *ts_rfc3339() { + + std::time_t now = std::time({}); + char timeString[std::size("yyyy-mm-dd hh:mm:ss")]; + std::strftime(std::data(timeString), std::size(timeString), + "%F %TZ", std::gmtime(&now)); + return strdup(timeString); +} + +constexpr char hex2ascii(char nibble) { return (0<= nibble && nibble<=9) ? nibble+'0' : nibble-10+'a'; } +constexpr char hiNibble(uint8_t hexbyte) { return (hexbyte & 0xf0) >> 4; } +constexpr char loNibble(uint8_t hexbyte) { return (hexbyte & 0x0f); } + +constexpr char *hexbytes2ascii(const char bytes[], const unsigned int length){ + + char asciiVal[9] {}; + for (unsigned int i = 0, j = 0; i < length; i++) { + + const uint8_t hexbyte = bytes[i]; + asciiVal[j++] = hex2ascii(hiNibble(hexbyte)); + asciiVal[j++] = hex2ascii(loNibble(hexbyte)); + + } + return strdup(asciiVal); +} + +using namespace std; + +int setXrdCksAttr(const int fd, const char* cstype, const char* ckSumbuf) { + + int rc = -1; + + std::vector attrData = XrdCksAttrData(cstype, ckSumbuf, time(0)); + + rc = ceph_posix_fsetxattr(fd, XrdCksAttrName(cstype).c_str(), + attrData.data(), attrData.size(), 0); + + return rc; +} /// small struct for directory listing struct DirIterator { @@ -111,6 +153,17 @@ XrdSysMutex g_init_mutex; //JW Counter for number of times a given cluster is resolved. std::map g_idxCntr; +//IJJ: Actions for Adler32 checksum + +extern bool g_calcStreamedAdler32; +bool g_calcStreamedAdler32 = false; +extern bool g_logStreamedAdler32; +bool g_logStreamedAdler32 = false; +extern bool g_storeStreamedAdler32; +bool g_storeStreamedAdler32 = false; + +FILE *g_cksLogFile; + /// Accessor to next ceph pool index /// Note that this is not thread safe, but we do not care /// as we only want a rough load balancing @@ -529,6 +582,7 @@ int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, con } int rc = g_cluster[cephPoolIdx]->ioctx_create(file.pool.c_str(), *ioctx); if (rc != 0) { + logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, user@pool = %s", userAtPool.c_str()); logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, rc = %d", rc); cluster->shutdown(); delete cluster; @@ -748,7 +802,13 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode } } } + // At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it + if (g_calcStreamedAdler32) { + fr.cksCalcadler32 = new XrdCksCalcadler32(); + fr.cksCalcadler32->Init(); + } + fr.writingData = true; int fd = insertFileRef(fr); logwrapper((char*)"File descriptor %d associated to file %s opened in write mode", fd, pathname); return fd; @@ -757,6 +817,17 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode } + +const char* formatAdler32(unsigned long adler32) { + +#ifndef Xrd_Big_Endian + adler32 = htonl(adler32); +#endif + char adler32Cks[8+1]; + sprintf(adler32Cks, "%08lx", adler32); + return (const char*)strdup(adler32Cks); +} + int ceph_posix_close(int fd) { CephFileRef* fr = getFileRef(fd); if (fr) { @@ -769,6 +840,9 @@ int ceph_posix_close(int fd) { lastAsyncAge = 1.0 * (now.tv_sec - fr->lastAsyncSubmission.tv_sec) + 0.000001 * (now.tv_usec - fr->lastAsyncSubmission.tv_usec); } + if (fr->bytesWritten > 0){ + ceph_posix_fremovexattr(fd,"XrdCks.adler32"); + } logwrapper((char*)"ceph_close: closed fd %d for file %s, read ops count %d, write ops count %d, " "async write ops %d/%d, async pending write bytes %ld, " "async read ops %d/%d, bytes written/max offset %ld/%ld, " @@ -777,6 +851,32 @@ int ceph_posix_close(int fd) { fr->asyncWrCompletionCount, fr->asyncWrStartCount, fr->bytesAsyncWritePending, fr->asyncRdCompletionCount, fr->asyncRdStartCount, fr->bytesWritten, fr->maxOffsetWritten, fr->longestAsyncWriteTime, fr->longestCallbackInvocation, (lastAsyncAge)); + + if (fr->writingData) { + if (g_calcStreamedAdler32) { + + unsigned long adlerULong; + memcpy((&adlerULong), fr->cksCalcadler32->Final(), 4); + const char* adler32Cks = formatAdler32(adlerULong); + + logwrapper((char*)"ceph_close: fd: %d, Adler32 streamed checksum = %s", fd, adler32Cks); + + if (g_logStreamedAdler32) { + const char *path = strdup((fr->pool + ":" + fr->name).c_str()); + fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), path, "streamed", "adler32", adler32Cks); + fflush(g_cksLogFile); + } + + if (g_storeStreamedAdler32) { + int rc = setXrdCksAttr(fd, "adler32", adler32Cks); + if (rc != 0) { + logwrapper((char*)"ceph_close: Can't set attribute XrdCks.adler32 for checksum"); + } + } + delete fr->cksCalcadler32; + } + } + deleteFileRef(fd, *fr); return 0; } else { @@ -838,6 +938,9 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) { fr->wrcount++; fr->bytesWritten+=count; if (fr->offset) fr->maxOffsetWritten = std::max(fr->offset - 1, fr->maxOffsetWritten); + if (g_calcStreamedAdler32) { + fr->cksCalcadler32->Update((const char*)buf, count); + } return count; } else { return -EBADF; @@ -848,7 +951,8 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) CephFileRef* fr = getFileRef(fd); if (fr) { // TODO implement proper logging level for this plugin - this should be only debug - //logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count); + //logwrapper((char*)"ceph_posix_pwrite: for fd %d, count=%d", fd, count); + if ((fr->flags & O_ACCMODE) == O_RDONLY) { return -EBADF; } @@ -859,11 +963,17 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) ceph::bufferlist bl; bl.append((const char*)buf, count); int rc = striper->write(fr->name, bl, count, offset); + if (rc) return rc; XrdSysMutexHelper lock(fr->statsMutex); fr->wrcount++; fr->bytesWritten+=count; if (offset + count) fr->maxOffsetWritten = std::max(uint64_t(offset + count - 1), fr->maxOffsetWritten); + + if (g_calcStreamedAdler32) { + fr->cksCalcadler32->Update((const char*)buf, count); + } + return count; } else { return -EBADF; @@ -908,7 +1018,7 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { const char *buf = (const char*)aiop->sfsAio.aio_buf; size_t offset = aiop->sfsAio.aio_offset; // TODO implement proper logging level for this plugin - this should be only debug - //logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count); + logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count); if ((fr->flags & O_ACCMODE) == O_RDONLY) { return -EBADF; } @@ -938,6 +1048,9 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { fr->asyncWrStartCount++; ::gettimeofday(&fr->lastAsyncSubmission, nullptr); fr->bytesAsyncWritePending+=count; + if (g_calcStreamedAdler32) { + fr->cksCalcadler32->Update((const char*)buf, count); + } return rc; } else { return -EBADF; @@ -1349,8 +1462,23 @@ static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char* na ssize_t ceph_posix_setxattr(XrdOucEnv* env, const char* path, const char* name, const void* value, size_t size, int flags) { + int rc; + + auto *cks = (XrdCksData*)value; logwrapper((char*)"ceph_setxattr: path %s name=%s value=%s", path, name, value); - return ceph_posix_internal_setxattr(getCephFile(path, env), name, value, size, flags); + rc = ceph_posix_internal_setxattr(getCephFile(path, env), name, value, size, flags); + + if (0 == rc && !strcmp(name, "XrdCks.adler32") && g_logStreamedAdler32) { +// +// We know that streamed checksums use ceph_posix_fsetxattr below, so this must be a readback checksum +// + auto cksAscii = (const char*)hexbytes2ascii(cks->Value, cks->Length); + logwrapper((char*)"readback checksum = %s", cksAscii); + fprintf(g_cksLogFile, "%s,%s,%s,%s,%s\n", ts_rfc3339(), path, "readback", "adler32", cksAscii); + fflush(g_cksLogFile); + + } + return rc; } int ceph_posix_fsetxattr(int fd, @@ -1543,7 +1671,6 @@ int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) { logwrapper((char*)"ceph_posix_unlink : %s", pathname); // start the timer auto timer_start = std::chrono::steady_clock::now(); - // minimal stat : only size and times are filled CephFile file = getCephFile(pathname, env); libradosstriper::RadosStriper *striper = getRadosStriper(file); diff --git a/src/XrdCeph/XrdCephPosix.hh b/src/XrdCeph/XrdCephPosix.hh index 5d01129b6ae..1dfee919527 100644 --- a/src/XrdCeph/XrdCephPosix.hh +++ b/src/XrdCeph/XrdCephPosix.hh @@ -39,6 +39,7 @@ #include "XrdSys/XrdSysPthread.hh" #include "XrdOuc/XrdOucIOVec.hh" +#include // simple logging for XrdCeph buffering code #define XRDCEPHLOGLEVEL 1 @@ -107,6 +108,7 @@ struct CephFile { unsigned int nbStripes; unsigned long long stripeUnit; unsigned long long objectSize; + }; struct CephFileRef : CephFile { @@ -127,6 +129,8 @@ struct CephFileRef : CephFile { ::timeval lastAsyncSubmission; double longestAsyncWriteTime; double longestCallbackInvocation; + bool writingData; + XrdCksCalcadler32 *cksCalcadler32; }; #endif // __XRD_CEPH_POSIX__ diff --git a/src/XrdCeph/XrdCephXAttr.cc b/src/XrdCeph/XrdCephXAttr.cc index 4c2f60a0f96..b56a6fc5b03 100644 --- a/src/XrdCeph/XrdCephXAttr.cc +++ b/src/XrdCeph/XrdCephXAttr.cc @@ -28,6 +28,9 @@ #include "XrdOuc/XrdOucTrace.hh" #include "XrdCeph/XrdCephXAttr.hh" +#include "XrdCks/XrdCksData.hh" +#include + XrdSysError XrdCephXattrEroute(0); XrdOucTrace XrdCephXattrTrace(&XrdCephXattrEroute); @@ -49,6 +52,7 @@ extern "C" XrdCephXattrEroute.Say("CephXattr loading failed with exception. Check the syntax of parameters : ", parms); return 0; } + return new XrdCephXAttr(); } } @@ -99,16 +103,19 @@ int XrdCephXAttr::List(AList **aPL, const char *Path, int fd, int getSz) { int XrdCephXAttr::Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd, int isNew) { + int rc = 0; + if (fd >= 0) { - return ceph_posix_fsetxattr(fd, Aname, Aval, Avsz, 0); + rc = ceph_posix_fsetxattr(fd, Aname, Aval, Avsz, 0); } else { try { - return ceph_posix_setxattr(0, Path, Aname, Aval, Avsz, 0); + rc = ceph_posix_setxattr(0, Path, Aname, Aval, Avsz, 0); } catch (std::exception &e) { XrdCephXattrEroute.Say("Set : invalid syntax in file parameters", Path); - return -EINVAL; + rc = -EINVAL; } } + return rc; } XrdVERSIONINFO(XrdSysGetXAttrObject, XrdCephXAttr);