diff --git a/src/littlefs/__init__.py b/src/littlefs/__init__.py index 6dce6d3..f76ac2a 100644 --- a/src/littlefs/__init__.py +++ b/src/littlefs/__init__.py @@ -58,7 +58,29 @@ class LittleFS: """Littlefs file system""" - def __init__(self, context: Optional["UserContext"] = None, mount=True, **kwargs) -> None: + def __init__( + self, + context: Optional["UserContext"] = None, + mount=True, + filename_encoding: Optional[str] = None, + **kwargs, + ) -> None: + """ + Parameters + ---------- + filename_encoding : Optional[str] + Encoding used to encode/decode filenames passed to and returned by + the filesystem. littlefs stores names as opaque byte strings, so this + is a free choice. Defaults to :data:`littlefs.lfs.FILENAME_ENCODING` + (``"utf-8"``). Set this when reading an image whose names were written + with a different encoding (e.g. ``"latin-1"`` or ``"shift-jis"``). + + Note that littlefs's ``name_max`` limit is measured in *encoded + bytes*, not characters. With a multi-byte encoding such as UTF-8, a + single non-ASCII character consumes 2-4 bytes, so a name can exceed + ``name_max`` (default 255) well before it looks long. + """ + self.filename_encoding = filename_encoding or lfs.FILENAME_ENCODING self.cfg = lfs.LFSConfig(context=context, **kwargs) self.fs = lfs.LFSFilesystem() @@ -204,7 +226,7 @@ def open( buffering = -1 try: - fh = lfs.file_open(self.fs, fname, mode) + fh = lfs.file_open(self.fs, fname, mode, self.filename_encoding) except LittleFSError as e: # Try to map to standard Python exceptions if e.code == LittleFSError.Error.LFS_ERR_NOENT: @@ -251,15 +273,15 @@ def open( def getattr(self, path: str, typ: Union[str, bytes, int]) -> bytes: typ = _typ_to_uint8(typ) - return lfs.getattr(self.fs, path, typ) + return lfs.getattr(self.fs, path, typ, self.filename_encoding) def setattr(self, path: str, typ: Union[str, bytes, int], data: bytes) -> None: typ = _typ_to_uint8(typ) - lfs.setattr(self.fs, path, typ, data) + lfs.setattr(self.fs, path, typ, data, self.filename_encoding) def removeattr(self, path: str, typ: Union[str, bytes, int]) -> None: typ = _typ_to_uint8(typ) - lfs.removeattr(self.fs, path, typ) + lfs.removeattr(self.fs, path, typ, self.filename_encoding) def listdir(self, path=".") -> List[str]: """List directory content @@ -274,7 +296,7 @@ def listdir(self, path=".") -> List[str]: def mkdir(self, path: str) -> int: """Create a new directory""" try: - return lfs.mkdir(self.fs, path) + return lfs.mkdir(self.fs, path, self.filename_encoding) except errors.LittleFSError as e: if e.code == LittleFSError.Error.LFS_ERR_EXIST: msg = "[LittleFSError {:d}] Cannot create a file when that file already exists: '{:s}'.".format( @@ -310,7 +332,7 @@ def remove(self, path: str, recursive: bool = False) -> None: If ``true`` and ``path`` is a directory, recursively remove all children files/folders. """ try: - lfs.remove(self.fs, path) + lfs.remove(self.fs, path, self.filename_encoding) return except errors.LittleFSError as e: if e.code == LittleFSError.Error.LFS_ERR_NOENT: @@ -326,7 +348,7 @@ def remove(self, path: str, recursive: bool = False) -> None: # Recursively delete the ``path`` directory for elem in self.scandir(path): self.remove(path + "/" + elem.name, recursive=True) - lfs.remove(self.fs, path) + lfs.remove(self.fs, path, self.filename_encoding) def removedirs(self, name): """Remove directories recursively @@ -351,7 +373,7 @@ def removedirs(self, name): def rename(self, src: str, dst: str) -> int: """Rename a file or directory""" - return lfs.rename(self.fs, src, dst) + return lfs.rename(self.fs, src, dst, self.filename_encoding) def rmdir(self, path: str) -> int: """Remove a directory @@ -362,17 +384,19 @@ def rmdir(self, path: str) -> int: def scandir(self, path=".") -> Iterator["LFSStat"]: """List directory content""" - dh = lfs.dir_open(self.fs, path) - info = lfs.dir_read(self.fs, dh) - while info: - if info.name not in [".", ".."]: - yield info - info = lfs.dir_read(self.fs, dh) - lfs.dir_close(self.fs, dh) + dh = lfs.dir_open(self.fs, path, self.filename_encoding) + try: + info = lfs.dir_read(self.fs, dh, self.filename_encoding) + while info: + if info.name not in [".", ".."]: + yield info + info = lfs.dir_read(self.fs, dh, self.filename_encoding) + finally: + lfs.dir_close(self.fs, dh) def stat(self, path: str) -> "LFSStat": """Get the status of a file or directory""" - return lfs.stat(self.fs, path) + return lfs.stat(self.fs, path, self.filename_encoding) def unlink(self, path: str) -> int: """Remove a file or directory diff --git a/src/littlefs/__main__.py b/src/littlefs/__main__.py index 60778fd..94da199 100644 --- a/src/littlefs/__main__.py +++ b/src/littlefs/__main__.py @@ -30,6 +30,7 @@ def _fs_from_args(args: argparse.Namespace, block_count=None, mount=True, contex "inline_max": args.inline_max, "attr_max": args.attr_max, "file_max": args.file_max, + "filename_encoding": getattr(args, "filename_encoding", None), } return LittleFS(context=context, mount=mount, **kwargs) @@ -294,6 +295,15 @@ def get_parser(): default=0, help="Max inline file size; 0 = use library default. Limiting can improve flash usage.", ) + # Host-side encode/decode choice; never stored in the image. The same encoding + # must be used to extract an image as was used to create it, otherwise filenames + # will fail to decode or come out as mojibake. + common_parser.add_argument( + "--filename-encoding", + default=None, + help="Encoding for filenames stored in the image. Defaults to utf-8. " + "Use e.g. latin-1 or shift-jis for images whose names use a different encoding.", + ) subparsers = parser.add_subparsers(required=True, title="Available Commands", dest="command") diff --git a/src/littlefs/lfs.pyi b/src/littlefs/lfs.pyi index b4415c2..25e18c9 100644 --- a/src/littlefs/lfs.pyi +++ b/src/littlefs/lfs.pyi @@ -87,17 +87,19 @@ def mount(fs: LFSFilesystem, cfg: LFSConfig) -> int: ... def unmount(fs: LFSFilesystem) -> int: ... def fs_mkconsistent(fs: LFSFilesystem) -> int: ... def fs_grow(fs: LFSFilesystem, block_count) -> int: ... -def remove(fs: LFSFilesystem, path: str) -> int: ... -def rename(fs: LFSFilesystem, oldpath: str, newpath: str) -> int: ... -def stat(fs: LFSFilesystem, path: str) -> LFSStat: ... +def remove(fs: LFSFilesystem, path: str, filename_encoding: Optional[str] = ...) -> int: ... +def rename(fs: LFSFilesystem, oldpath: str, newpath: str, filename_encoding: Optional[str] = ...) -> int: ... +def stat(fs: LFSFilesystem, path: str, filename_encoding: Optional[str] = ...) -> LFSStat: ... # Attributes -def getattr(fs: LFSFilesystem, path: str, typ) -> bytes: ... -def setattr(fs: LFSFilesystem, path: str, typ, data) -> None: ... -def removeattr(fs: LFSFilesystem, path: str, typ) -> None: ... +def getattr(fs: LFSFilesystem, path: str, typ, filename_encoding: Optional[str] = ...) -> bytes: ... +def setattr(fs: LFSFilesystem, path: str, typ, data, filename_encoding: Optional[str] = ...) -> None: ... +def removeattr(fs: LFSFilesystem, path: str, typ, filename_encoding: Optional[str] = ...) -> None: ... # File Handling -def file_open(fs: LFSFilesystem, path: str, flags: Union[str, LFSFileFlag]) -> LFSFile: ... +def file_open( + fs: LFSFilesystem, path: str, flags: Union[str, LFSFileFlag], filename_encoding: Optional[str] = ... +) -> LFSFile: ... # def file_open_cfg(self, path, flags, config): ... def file_close(fs: LFSFilesystem, fh: LFSFile) -> int: ... @@ -111,9 +113,9 @@ def file_rewind(fs: LFSFilesystem, fh: LFSFile) -> int: ... def file_size(fs: LFSFilesystem, fh: LFSFile) -> int: ... # Directory Handling -def mkdir(fs: LFSFilesystem, path: str) -> int: ... -def dir_open(fs: LFSFilesystem, path: str) -> LFSDirectory: ... +def mkdir(fs: LFSFilesystem, path: str, filename_encoding: Optional[str] = ...) -> int: ... +def dir_open(fs: LFSFilesystem, path: str, filename_encoding: Optional[str] = ...) -> LFSDirectory: ... def dir_close(fs: LFSFilesystem, dh: LFSDirectory) -> int: ... -def dir_read(fs: LFSFilesystem, dh: LFSDirectory) -> LFSStat: ... +def dir_read(fs: LFSFilesystem, dh: LFSDirectory, filename_encoding: Optional[str] = ...) -> Optional[LFSStat]: ... def dir_tell(fs: LFSFilesystem, dh: LFSDirectory) -> int: ... def dir_rewind(fs: LFSFilesystem, dh: LFSDirectory) -> int: ... diff --git a/src/littlefs/lfs.pyx b/src/littlefs/lfs.pyx index a230960..eb45965 100644 --- a/src/littlefs/lfs.pyx +++ b/src/littlefs/lfs.pyx @@ -8,8 +8,13 @@ from littlefs.context import UserContext from littlefs import errors -FILENAME_ENCODING = 'ascii' -"""Default filename encoding""" +FILENAME_ENCODING = 'utf-8' +"""Default filename encoding. + +littlefs stores names as opaque byte strings, so any encoding works on the +C side. UTF-8 is used so that non-ASCII filenames are supported; since ASCII +is a strict subset of UTF-8, existing ASCII names encode/decode identically. +""" class LFSStat(NamedTuple): """Littlefs File / Directory status.""" @@ -329,50 +334,56 @@ def fs_grow(LFSFilesystem fs, block_count) -> int: return _raise_on_error(lfs_fs_grow(&fs._impl, block_count)) -def remove(LFSFilesystem fs, path): +def remove(LFSFilesystem fs, path, filename_encoding=None): """Remove a file or directory If removing a directory, the directory must be empty. """ - return _raise_on_error(lfs_remove(&fs._impl, path.encode(FILENAME_ENCODING))) + filename_encoding = filename_encoding or FILENAME_ENCODING + return _raise_on_error(lfs_remove(&fs._impl, path.encode(filename_encoding))) -def rename(LFSFilesystem fs, oldpath, newpath): +def rename(LFSFilesystem fs, oldpath, newpath, filename_encoding=None): """Rename or move a file or directory If the destination exists, it must match the source in type. If the destination is a directory, the directory must be empty. """ - return _raise_on_error(lfs_rename(&fs._impl, oldpath.encode(FILENAME_ENCODING), - newpath.encode(FILENAME_ENCODING))) + filename_encoding = filename_encoding or FILENAME_ENCODING + return _raise_on_error(lfs_rename(&fs._impl, oldpath.encode(filename_encoding), + newpath.encode(filename_encoding))) -def stat(LFSFilesystem fs, path): +def stat(LFSFilesystem fs, path, filename_encoding=None): """Find info about a file or directory""" + filename_encoding = filename_encoding or FILENAME_ENCODING cdef lfs_info * info = malloc(sizeof(lfs_info)) try: - _raise_on_error(lfs_stat(&fs._impl, path.encode(FILENAME_ENCODING), info)) - return LFSStat(info.type, info.size, info.name.decode(FILENAME_ENCODING)) + _raise_on_error(lfs_stat(&fs._impl, path.encode(filename_encoding), info)) + return LFSStat(info.type, info.size, info.name.decode(filename_encoding)) finally: free(info) -def getattr(LFSFilesystem fs, path, typ): +def getattr(LFSFilesystem fs, path, typ, filename_encoding=None): + filename_encoding = filename_encoding or FILENAME_ENCODING buf = bytearray(LFS_ATTR_MAX) cdef unsigned char[::1] buf_view = buf - attr_size = _raise_on_error(lfs_getattr(&fs._impl, path.encode(FILENAME_ENCODING), typ, &buf_view[0], LFS_ATTR_MAX)) + attr_size = _raise_on_error(lfs_getattr(&fs._impl, path.encode(filename_encoding), typ, &buf_view[0], LFS_ATTR_MAX)) return bytes(buf[:attr_size]) -def setattr(LFSFilesystem fs, path, typ, data): +def setattr(LFSFilesystem fs, path, typ, data, filename_encoding=None): + filename_encoding = filename_encoding or FILENAME_ENCODING cdef const unsigned char[::1] buf_view = data - _raise_on_error(lfs_setattr(&fs._impl, path.encode(FILENAME_ENCODING), typ, &buf_view[0], len(data))) + _raise_on_error(lfs_setattr(&fs._impl, path.encode(filename_encoding), typ, &buf_view[0], len(data))) -def removeattr(LFSFilesystem fs, path, typ): - _raise_on_error(lfs_removeattr(&fs._impl, path.encode(FILENAME_ENCODING), typ)) +def removeattr(LFSFilesystem fs, path, typ, filename_encoding=None): + filename_encoding = filename_encoding or FILENAME_ENCODING + _raise_on_error(lfs_removeattr(&fs._impl, path.encode(filename_encoding), typ)) -def file_open(LFSFilesystem fs, path, flags): +def file_open(LFSFilesystem fs, path, flags, filename_encoding=None): if isinstance(flags, str): creating = False reading = False @@ -418,8 +429,9 @@ def file_open(LFSFilesystem fs, path, flags): flags |= LFSFileFlag.rdwr flags = int(flags) + filename_encoding = filename_encoding or FILENAME_ENCODING fh = LFSFile() - _raise_on_error(lfs_file_open(&fs._impl, &fh._impl, path.encode(FILENAME_ENCODING), flags)) + _raise_on_error(lfs_file_open(&fs._impl, &fh._impl, path.encode(filename_encoding), flags)) return fh @@ -432,7 +444,7 @@ def file_close(LFSFilesystem fs, LFSFile fh): def file_sync(LFSFilesystem fs, LFSFile fh): - _raise_on_error(lfs_file_sync(&fs._impl, &fh._impl)) + return _raise_on_error(lfs_file_sync(&fs._impl, &fh._impl)) def file_read(LFSFilesystem fs, LFSFile fh, size): @@ -470,24 +482,27 @@ def file_rewind(LFSFilesystem fs, LFSFile fh): def file_size(LFSFilesystem fs, LFSFile fh): return _raise_on_error(lfs_file_size(&fs._impl, &fh._impl)) -def mkdir(LFSFilesystem fs, path): - return _raise_on_error(lfs_mkdir(&fs._impl, path.encode(FILENAME_ENCODING))) +def mkdir(LFSFilesystem fs, path, filename_encoding=None): + filename_encoding = filename_encoding or FILENAME_ENCODING + return _raise_on_error(lfs_mkdir(&fs._impl, path.encode(filename_encoding))) -def dir_open(LFSFilesystem fs, path): +def dir_open(LFSFilesystem fs, path, filename_encoding=None): + filename_encoding = filename_encoding or FILENAME_ENCODING handle = LFSDirectory() - _raise_on_error(lfs_dir_open(&fs._impl, &handle._impl, path.encode(FILENAME_ENCODING))) + _raise_on_error(lfs_dir_open(&fs._impl, &handle._impl, path.encode(filename_encoding))) return handle def dir_close(LFSFilesystem fs, LFSDirectory dh): return _raise_on_error(lfs_dir_close(&fs._impl, &dh._impl)) -def dir_read(LFSFilesystem fs, LFSDirectory dh): +def dir_read(LFSFilesystem fs, LFSDirectory dh, filename_encoding=None): + filename_encoding = filename_encoding or FILENAME_ENCODING cdef lfs_info * info = malloc(sizeof(lfs_info)) try: retval = _raise_on_error(lfs_dir_read(&fs._impl, &dh._impl, info)) if retval == 0: return None - return LFSStat(info.type, info.size, info.name.decode(FILENAME_ENCODING)) + return LFSStat(info.type, info.size, info.name.decode(filename_encoding)) finally: free(info) diff --git a/test/cli/test_create_and_extract.py b/test/cli/test_create_and_extract.py index 19c380a..ec8d960 100644 --- a/test/cli/test_create_and_extract.py +++ b/test/cli/test_create_and_extract.py @@ -6,6 +6,53 @@ from littlefs.__main__ import main +def test_filename_encoding_roundtrip(tmp_path, capsys): + """Create an image with a non-UTF-8 filename encoding and list it back. + + "ÿ" is 0xFF in latin-1 but a 2-byte sequence in UTF-8, so the chosen + encoding must be honored on both the create (encode) and list (decode) side. + """ + source_dir = tmp_path / "source" + source_dir.mkdir() + name = "naïveÿ.txt" + (source_dir / name).write_text("hello") + + image_file = tmp_path / "image.bin" + assert ( + main( + [ + "littlefs", "create", str(source_dir), str(image_file), + "--block-size", "512", "--fs-size", "64KB", + "--filename-encoding", "latin-1", + ] + ) + == 0 + ) + + # Listing with the matching encoding round-trips the name. + assert ( + main( + [ + "littlefs", "list", str(image_file), + "--block-size", "512", + "--filename-encoding", "latin-1", + ] + ) + == 0 + ) + assert name in capsys.readouterr().out + + # The on-disk name byte is 0xFF, which is invalid standalone UTF-8, so the + # default-encoding (utf-8) list fails loudly rather than silently mis-decoding. + with pytest.raises(UnicodeDecodeError): + main( + [ + "littlefs", "list", str(image_file), + "--block-size", "512", + ] + ) + + def test_create_and_extract(tmp_path): """Test creating a filesystem image and extracting it.""" # Create test directory with files diff --git a/test/test_unicode_filenames.py b/test/test_unicode_filenames.py new file mode 100644 index 0000000..6821609 --- /dev/null +++ b/test/test_unicode_filenames.py @@ -0,0 +1,101 @@ +import pytest + +from littlefs import LittleFS + +# A mix of Latin-1, CJK and astral-plane (emoji) characters to exercise +# multi-byte UTF-8 sequences. +UNICODE_NAMES = ["café.txt", "日本語.bin", "emoji_😀.dat"] + + +@pytest.fixture(scope="function") +def fs(): + yield LittleFS(block_size=128, block_count=64) + + +@pytest.mark.parametrize("name", UNICODE_NAMES) +def test_open_write_read_roundtrip(fs, name): + payload = name.encode("utf-8") + with fs.open(name, "wb") as f: + f.write(payload) + with fs.open(name, "rb") as f: + assert f.read() == payload + + +@pytest.mark.parametrize("name", UNICODE_NAMES) +def test_stat_preserves_name(fs, name): + with fs.open(name, "wb") as f: + f.write(b"x") + assert fs.stat(name).name == name + + +def test_listdir_roundtrips_unicode(fs): + for name in UNICODE_NAMES: + with fs.open(name, "wb") as f: + f.write(b"x") + assert set(fs.listdir("/")) == set(UNICODE_NAMES) + + +@pytest.mark.parametrize("name", UNICODE_NAMES) +def test_mkdir_and_nested_file(fs, name): + fs.mkdir(name) + assert name in fs.listdir("/") + nested = name + "/inner_£.txt" + with fs.open(nested, "wb") as f: + f.write(b"x") + assert fs.stat(nested).name == "inner_£.txt" + + +@pytest.mark.parametrize("name", UNICODE_NAMES) +def test_rename_and_remove(fs, name): + with fs.open(name, "wb") as f: + f.write(b"x") + renamed = "renamed_" + name + fs.rename(name, renamed) + assert renamed in fs.listdir("/") + assert name not in fs.listdir("/") + fs.remove(renamed) + assert renamed not in fs.listdir("/") + + +def test_ascii_names_still_work(fs): + """ASCII is a strict subset of UTF-8: existing names must be unaffected.""" + with fs.open("plain.txt", "wb") as f: + f.write(b"hello") + with fs.open("plain.txt", "rb") as f: + assert f.read() == b"hello" + assert fs.stat("plain.txt").name == "plain.txt" + + +def test_per_instance_encoding_roundtrips(): + """A non-UTF-8 ``filename_encoding`` is honored for both encode and decode.""" + fs = LittleFS(block_size=128, block_count=64, filename_encoding="latin-1") + # "ÿ" is 0xFF in latin-1 but a 2-byte sequence in utf-8. + name = "naïveÿ.txt" + with fs.open(name, "wb") as f: + f.write(b"x") + assert fs.stat(name).name == name + assert name in fs.listdir("/") + fs.rename(name, "renamed_ÿ.txt") + assert "renamed_ÿ.txt" in fs.listdir("/") + + +def test_per_instance_encoding_writes_expected_bytes(): + """The on-disk name bytes reflect the chosen encoding, not the default UTF-8.""" + fs = LittleFS(block_size=128, block_count=64, filename_encoding="latin-1") + with fs.open("ÿ.txt", "wb") as f: + f.write(b"x") + # Re-mount the same backing store with latin-1: names decode cleanly. + same = LittleFS(context=fs.context, filename_encoding="latin-1", block_size=128, block_count=64) + assert "ÿ.txt" in same.listdir("/") + # The byte written was 0xFF, which is invalid standalone UTF-8, so a UTF-8 + # instance over the same store fails loudly rather than silently mis-decoding. + utf8_view = LittleFS(context=fs.context, block_size=128, block_count=64) + with pytest.raises(UnicodeDecodeError): + utf8_view.listdir("/") + + +def test_default_encoding_is_utf8(): + fs = LittleFS(block_size=128, block_count=64) + from littlefs import lfs + + assert fs.filename_encoding == lfs.FILENAME_ENCODING == "utf-8"